# -*- coding: utf-8 -*-
# cython: language_level=3
"""
File and Stream Operations
"""
import io
import math
import mpmath
import os
import struct
import sympy
import tempfile
from io import BytesIO
import os.path as osp
from itertools import chain
from mathics.version import __version__ # noqa used in loading to check consistency.
from mathics_scanner.errors import IncompleteSyntaxError, InvalidSyntaxError
from mathics_scanner import TranslateError
from mathics.core.parser import MathicsFileLineFeeder, MathicsMultiLineFeeder, parse
from mathics.core.expression import (
BoxError,
Complex,
BaseExpression,
Expression,
Integer,
MachineReal,
Real,
String,
Symbol,
SymbolFailed,
SymbolNull,
SymbolTrue,
from_mpmath,
from_python,
)
from mathics.core.numbers import dps
from mathics.core.streams import (
Stream,
path_search,
stream_manager,
)
import mathics
from mathics.builtin.base import Builtin, Predefined, BinaryOperator, PrefixOperator
from mathics.builtin.strings import to_python_encoding
from mathics.builtin.base import MessageException
INITIAL_DIR = os.getcwd()
DIRECTORY_STACK = [INITIAL_DIR]
INPUT_VAR = ""
INPUTFILE_VAR = ""
TMP_DIR = tempfile.gettempdir()
SymbolPath = Symbol("$Path")
def _channel_to_stream(channel, mode="r"):
if isinstance(channel, String):
name = channel.get_string_value()
opener = mathics_open(name, mode)
opener.__enter__()
n = opener.n
if mode in ["r", "rb"]:
head = "InputStream"
elif mode in ["w", "a", "wb", "ab"]:
head = "OutputStream"
else:
raise ValueError(f"Unknown format {mode}")
return Expression(head, channel, Integer(n))
elif channel.has_form("InputStream", 2):
return channel
elif channel.has_form("OutputStream", 2):
return channel
else:
return None
class mathics_open(Stream):
def __init__(self, name, mode="r", encoding=None):
if encoding is not None:
encoding = to_python_encoding(encoding)
if "b" in mode:
# We should not specify an encoding for a binary mode
encoding = None
elif encoding is None:
raise MessageException("General", "charcode", self.encoding)
self.encoding = encoding
super().__init__(name, mode, self.encoding)
self.old_inputfile_var = None # Set in __enter__ and __exit__
def __enter__(self):
# find path
path = path_search(self.name)
if path is None and self.mode in ["w", "a", "wb", "ab"]:
path = self.name
if path is None:
raise IOError
# open the stream
fp = io.open(path, self.mode, encoding=self.encoding)
global INPUTFILE_VAR
INPUTFILE_VAR = osp.abspath(path)
stream_manager.add(
name=path,
mode=self.mode,
encoding=self.encoding,
io=fp,
num=stream_manager.next,
)
return fp
def __exit__(self, type, value, traceback):
global INPUTFILE_VAR
INPUTFILE_VAR = self.old_inputfile_var or ""
super().__exit__(type, value, traceback)
class Input(Predefined):
"""
- '$Input'
- is the name of the stream from which input is currently being read.
>> $Input
= #<--#
"""
attributes = ("Protected", "ReadProtected")
name = "$Input"
def evaluate(self, evaluation):
global INPUT_VAR
return String(INPUT_VAR)
class InputFileName(Predefined):
"""
- '$InputFileName'
- is the name of the file from which input is currently being read.
While in interactive mode, '$InputFileName' is "".
X> $InputFileName
"""
name = "$InputFileName"
def evaluate(self, evaluation):
global INPUTFILE_VAR
return String(INPUTFILE_VAR)
class EndOfFile(Builtin):
"""
- 'EndOfFile'
- is returned by 'Read' when the end of an input stream is reached.
"""
SymbolEndOfFile = Symbol("EndOfFile")
# TODO: Improve docs for these Read[] arguments.
class Byte(Builtin):
"""
- 'Byte'
- is a data type for 'Read'.
"""
class Character(Builtin):
"""
- 'Character'
- is a data type for 'Read'.
"""
class Expression_(Builtin):
"""
- 'Expression'
- is a data type for 'Read'.
"""
name = "Expression"
class Number_(Builtin):
"""
- 'Number'
- is a data type for 'Read'.
"""
name = "Number"
class Record(Builtin):
"""
- 'Record'
- is a data type for 'Read'.
"""
class Word(Builtin):
"""
- 'Word'
- is a data type for 'Read'.
"""
class Read(Builtin):
"""
- 'Read[$stream$]'
- reads the input stream and returns one expression.
- 'Read[$stream$, $type$]'
- reads the input stream and returns an object of the given type.
- 'Read[$stream$, $type$]'
- reads the input stream and returns an object of the given type.
- 'Read[$stream$, Hold[Expression]]'
- reads the input stream for an Expression and puts it inside 'Hold'.
$type$ is one of:
- Byte
- Character
- Expression
- HoldExpression
- Number
- Real
- Record
- String
- Word
## Malformed InputString
#> Read[InputStream[String], {Word, Number}]
= Read[InputStream[String], {Word, Number}]
## Correctly formed InputString but not open
#> Read[InputStream[String, -1], {Word, Number}]
: InputStream[String, -1] is not open.
= Read[InputStream[String, -1], {Word, Number}]
## Reading Strings
>> stream = StringToStream["abc123"];
>> Read[stream, String]
= abc123
#> Read[stream, String]
= EndOfFile
#> Close[stream];
## Reading Words
>> stream = StringToStream["abc 123"];
>> Read[stream, Word]
= abc
>> Read[stream, Word]
= 123
#> Read[stream, Word]
= EndOfFile
#> Close[stream];
#> stream = StringToStream[""];
#> Read[stream, Word]
= EndOfFile
#> Read[stream, Word]
= EndOfFile
#> Close[stream];
## Number
>> stream = StringToStream["123, 4"];
>> Read[stream, Number]
= 123
>> Read[stream, Number]
= 4
#> Read[stream, Number]
= EndOfFile
#> Close[stream];
#> stream = StringToStream["123xyz 321"];
#> Read[stream, Number]
= 123
#> Quiet[Read[stream, Number]]
= $Failed
## Real
#> stream = StringToStream["123, 4abc"];
#> Read[stream, Real]
= 123.
#> Read[stream, Real]
= 4.
#> Quiet[Read[stream, Number]]
= $Failed
#> Close[stream];
#> stream = StringToStream["1.523E-19"]; Read[stream, Real]
= 1.523*^-19
#> Close[stream];
#> stream = StringToStream["-1.523e19"]; Read[stream, Real]
= -1.523*^19
#> Close[stream];
#> stream = StringToStream["3*^10"]; Read[stream, Real]
= 3.*^10
#> Close[stream];
#> stream = StringToStream["3.*^10"]; Read[stream, Real]
= 3.*^10
#> Close[stream];
## Expression
#> stream = StringToStream["x + y Sin[z]"]; Read[stream, Expression]
= x + y Sin[z]
#> Close[stream];
## #> stream = Quiet[StringToStream["Sin[1 123"]; Read[stream, Expression]]
## = $Failed
## HoldExpression:
>> stream = StringToStream["2+2\\n2+3"];
'Read' with a 'Hold[Expression]' returns the expression it reads unevaluated so it can be later inspected and evaluated:
>> Read[stream, Hold[Expression]]
= Hold[2 + 2]
>> Read[stream, Expression]
= 5
>> Close[stream];
Reading a comment however will return the empy list:
>> stream = StringToStream["(* ::Package:: *)"];
>> Read[stream, Hold[Expression]]
= {}
>> Close[stream];
## Multiple types
>> stream = StringToStream["123 abc"];
>> Read[stream, {Number, Word}]
= {123, abc}
#> Read[stream, {Number, Word}]
= EndOfFile
#> Close[stream];
#> stream = StringToStream["123 abc"];
#> Quiet[Read[stream, {Word, Number}]]
= $Failed
#> Close[stream];
#> stream = StringToStream["123 123"]; Read[stream, {Real, Number}]
= {123., 123}
#> Close[stream];
#> Quiet[Read[stream, {Real}]]
= Read[InputStream[String, ...], {Real}]
Multiple lines:
>> stream = StringToStream["\\"Tengo una\\nvaca lechera.\\""]; Read[stream]
= Tengo una
. vaca lechera.
"""
messages = {
"openx": "`1` is not open.",
"readf": "`1` is not a valid format specification.",
"readn": "Invalid real number found when reading from `1`.",
"readt": "Invalid input found when reading `1` from `2`.",
"intnm": (
"Non-negative machine-sized integer expected at " "position 3 in `1`."
),
}
rules = {
"Read[stream_]": "Read[stream, Expression]",
}
options = {
"NullRecords": "False",
"NullWords": "False",
"RecordSeparators": '{"\r\n", "\n", "\r"}',
"TokenWords": "{}",
"WordSeparators": '{" ", "\t"}',
}
attributes = "Protected"
def check_options(self, options):
# Options
# TODO Proper error messages
result = {}
keys = list(options.keys())
# AnchoredSearch
if "System`AnchoredSearch" in keys:
anchored_search = options["System`AnchoredSearch"].to_python()
assert anchored_search in [True, False]
result["AnchoredSearch"] = anchored_search
# IgnoreCase
if "System`IgnoreCase" in keys:
ignore_case = options["System`IgnoreCase"].to_python()
assert ignore_case in [True, False]
result["IgnoreCase"] = ignore_case
# WordSearch
if "System`WordSearch" in keys:
word_search = options["System`WordSearch"].to_python()
assert word_search in [True, False]
result["WordSearch"] = word_search
# RecordSeparators
if "System`RecordSeparators" in keys:
record_separators = options["System`RecordSeparators"].to_python()
assert isinstance(record_separators, list)
assert all(
isinstance(s, str) and s[0] == s[-1] == '"' for s in record_separators
)
record_separators = [s[1:-1] for s in record_separators]
result["RecordSeparators"] = record_separators
# WordSeparators
if "System`WordSeparators" in keys:
word_separators = options["System`WordSeparators"].to_python()
assert isinstance(word_separators, list)
assert all(
isinstance(s, str) and s[0] == s[-1] == '"' for s in word_separators
)
word_separators = [s[1:-1] for s in word_separators]
result["WordSeparators"] = word_separators
# NullRecords
if "System`NullRecords" in keys:
null_records = options["System`NullRecords"].to_python()
assert null_records in [True, False]
result["NullRecords"] = null_records
# NullWords
if "System`NullWords" in keys:
null_words = options["System`NullWords"].to_python()
assert null_words in [True, False]
result["NullWords"] = null_words
# TokenWords
if "System`TokenWords" in keys:
token_words = options["System`TokenWords"].to_python()
assert token_words == []
result["TokenWords"] = token_words
return result
def apply(self, channel, types, evaluation, options):
"Read[channel_, types_, OptionsPattern[Read]]"
if channel.has_form("OutputStream", 2):
evaluation.message("General", "openw", channel)
return
strm = _channel_to_stream(channel, "r")
if strm is None:
return
[name, n] = strm.get_leaves()
stream = stream_manager.lookup_stream(n.get_int_value())
if stream is None:
evaluation.message("Read", "openx", strm)
return
if stream.io is None:
stream.__enter__()
if stream.io.closed:
evaluation.message("Read", "openx", strm)
return
# Wrap types in a list (if it isn't already one)
if types.has_form("List", None):
types = types._leaves
else:
types = (types,)
# TODO: look for a better implementation handling "Hold[Expression]".
#
types = (
Symbol("HoldExpression")
if (
typ.get_head_name() == "System`Hold"
and typ.leaves[0].get_name() == "System`Expression"
)
else typ
for typ in types
)
types = Expression("List", *types)
READ_TYPES = [
Symbol(k)
for k in [
"Byte",
"Character",
"Expression",
"HoldExpression",
"Number",
"Real",
"Record",
"String",
"Word",
]
]
for typ in types.leaves:
if typ not in READ_TYPES:
evaluation.message("Read", "readf", typ)
return SymbolFailed
# Options
# TODO Implement extra options
py_options = self.check_options(options)
# null_records = py_options['NullRecords']
# null_words = py_options['NullWords']
record_separators = py_options["RecordSeparators"]
# token_words = py_options['TokenWords']
word_separators = py_options["WordSeparators"]
name = name.to_python()
result = []
def reader(stream, word_separators, accepted=None):
while True:
word = ""
while True:
try:
tmp = stream.io.read(1)
except UnicodeDecodeError:
tmp = " " # ignore
evaluation.message("General", "ucdec")
if tmp == "":
if word == "":
pos = stream.io.tell()
newchar = stream.io.read(1)
if pos == stream.io.tell():
raise EOFError
else:
if newchar:
word = newchar
continue
else:
yield word
continue
last_word = word
word = ""
yield last_word
break
if tmp in word_separators:
if word == "":
continue
if stream.io.seekable():
# stream.io.seek(-1, 1) #Python3
stream.io.seek(stream.io.tell() - 1)
last_word = word
word = ""
yield last_word
break
if accepted is not None and tmp not in accepted:
last_word = word
word = ""
yield last_word
break
word += tmp
read_word = reader(stream, word_separators)
read_record = reader(stream, record_separators)
read_number = reader(
stream,
word_separators + record_separators,
["+", "-", "."] + [str(i) for i in range(10)],
)
read_real = reader(
stream,
word_separators + record_separators,
["+", "-", ".", "e", "E", "^", "*"] + [str(i) for i in range(10)],
)
for typ in types.leaves:
try:
if typ == Symbol("Byte"):
tmp = stream.io.read(1)
if tmp == "":
raise EOFError
result.append(ord(tmp))
elif typ == Symbol("Character"):
tmp = stream.io.read(1)
if tmp == "":
raise EOFError
result.append(tmp)
elif typ == Symbol("Expression") or typ == Symbol("HoldExpression"):
tmp = next(read_record)
while True:
try:
feeder = MathicsMultiLineFeeder(tmp)
expr = parse(evaluation.definitions, feeder)
break
except (IncompleteSyntaxError, InvalidSyntaxError):
try:
nextline = next(read_record)
tmp = tmp + "\n" + nextline
except EOFError:
expr = SymbolEndOfFile
break
except Exception as e:
print(e)
if expr == SymbolEndOfFile:
evaluation.message(
"Read", "readt", tmp, Expression("InputSteam", name, n)
)
return SymbolFailed
elif isinstance(expr, BaseExpression):
if typ == Symbol("HoldExpression"):
expr = Expression("Hold", expr)
result.append(expr)
# else:
# TODO: Supposedly we can't get here
# what code should we put here?
elif typ == Symbol("Number"):
tmp = next(read_number)
try:
tmp = int(tmp)
except ValueError:
try:
tmp = float(tmp)
except ValueError:
evaluation.message(
"Read", "readn", Expression("InputSteam", name, n)
)
return SymbolFailed
result.append(tmp)
elif typ == Symbol("Real"):
tmp = next(read_real)
tmp = tmp.replace("*^", "E")
try:
tmp = float(tmp)
except ValueError:
evaluation.message(
"Read", "readn", Expression("InputSteam", name, n)
)
return SymbolFailed
result.append(tmp)
elif typ == Symbol("Record"):
result.append(next(read_record))
elif typ == Symbol("String"):
tmp = stream.io.readline()
if len(tmp) == 0:
raise EOFError
result.append(tmp.rstrip("\n"))
elif typ == Symbol("Word"):
result.append(next(read_word))
except EOFError:
return SymbolEndOfFile
except UnicodeDecodeError:
evaluation.message("General", "ucdec")
if len(result) == 1:
return from_python(*result)
return from_python(result)
def apply_nostream(self, arg1, arg2, evaluation):
"Read[arg1_, arg2_]"
evaluation.message("General", "stream", arg1)
return
class Write(Builtin):
"""
- 'Write[$channel$, $expr1$, $expr2$, ...]'
- writes the expressions to the output channel followed by a newline.
>> stream = OpenWrite[]
= ...
>> Write[stream, 10 x + 15 y ^ 2]
>> Write[stream, 3 Sin[z]]
>> Close[stream]
= ...
>> stream = OpenRead[%];
>> ReadList[stream]
= {10 x + 15 y ^ 2, 3 Sin[z]}
#> Close[stream];
"""
attributes = "Protected"
def apply(self, channel, expr, evaluation):
"Write[channel_, expr___]"
strm = _channel_to_stream(channel)
if strm is None:
return
n = strm.leaves[1].get_int_value()
stream = stream_manager.lookup_stream(n)
if stream is None or stream.io is None or stream.io.closed:
evaluation.message("General", "openx", channel)
return SymbolNull
expr = expr.get_sequence()
expr = Expression("Row", Expression("List", *expr))
evaluation.format = "text"
text = evaluation.format_output(from_python(expr))
stream.io.write(str(text) + "\n")
return SymbolNull
class _BinaryFormat(object):
"""
Container for BinaryRead readers and BinaryWrite writers
"""
@staticmethod
def _IEEE_real(real):
if math.isnan(real):
return Symbol("Indeterminate")
elif math.isinf(real):
return Expression("DirectedInfinity", Integer((-1) ** (real < 0)))
else:
return Real(real)
@staticmethod
def _IEEE_cmplx(real, imag):
if math.isnan(real) or math.isnan(imag):
return Symbol("Indeterminate")
elif math.isinf(real) or math.isinf(imag):
if math.isinf(real) and math.isinf(imag):
return Symbol("Indeterminate")
return Expression(
"DirectedInfinity",
Expression(
"Complex",
(-1) ** (real < 0) if math.isinf(real) else 0,
(-1) ** (imag < 0) if math.isinf(imag) else 0,
),
)
else:
return Complex(MachineReal(real), MachineReal(imag))
@classmethod
def get_readers(cls):
readers = {}
for funcname in dir(cls):
if funcname.startswith("_") and funcname.endswith("_reader"):
readers[funcname[1:-7]] = getattr(cls, funcname)
return readers
@classmethod
def get_writers(cls):
writers = {}
for funcname in dir(cls):
if funcname.startswith("_") and funcname.endswith("_writer"):
writers[funcname[1:-7]] = getattr(cls, funcname)
return writers
# Reader Functions
@staticmethod
def _Byte_reader(s):
"8-bit unsigned integer"
return Integer(*struct.unpack("B", s.read(1)))
@staticmethod
def _Character8_reader(s):
"8-bit character"
return String(struct.unpack("c", s.read(1))[0].decode("ascii"))
@staticmethod
def _Character16_reader(s):
"16-bit character"
return String(chr(*struct.unpack("H", s.read(2))))
@staticmethod
def _Complex64_reader(s):
"IEEE single-precision complex number"
return _BinaryFormat._IEEE_cmplx(*struct.unpack("ff", s.read(8)))
@staticmethod
def _Complex128_reader(s):
"IEEE double-precision complex number"
return _BinaryFormat._IEEE_cmplx(*struct.unpack("dd", s.read(16)))
def _Complex256_reader(self, s):
"IEEE quad-precision complex number"
return Complex(self._Real128_reader(s), self._Real128_reader(s))
@staticmethod
def _Integer8_reader(s):
"8-bit signed integer"
return Integer(*struct.unpack("b", s.read(1)))
@staticmethod
def _Integer16_reader(s):
"16-bit signed integer"
return Integer(*struct.unpack("h", s.read(2)))
@staticmethod
def _Integer24_reader(s):
"24-bit signed integer"
b = s.read(3)
return Integer(struct.unpack("> 8)
@staticmethod
def _Integer32_reader(s):
"32-bit signed integer"
return Integer(*struct.unpack("i", s.read(4)))
@staticmethod
def _Integer64_reader(s):
"64-bit signed integer"
return Integer(*struct.unpack("q", s.read(8)))
@staticmethod
def _Integer128_reader(s):
"128-bit signed integer"
a, b = struct.unpack("Qq", s.read(16))
return Integer((b << 64) + a)
@staticmethod
def _Real32_reader(s):
"IEEE single-precision real number"
return _BinaryFormat._IEEE_real(*struct.unpack("f", s.read(4)))
@staticmethod
def _Real64_reader(s):
"IEEE double-precision real number"
return _BinaryFormat._IEEE_real(*struct.unpack("d", s.read(8)))
@staticmethod
def _Real128_reader(s):
"IEEE quad-precision real number"
# Workaround quad missing from struct
# correctness is not guaranteed
b = s.read(16)
sig, sexp = b[:14], b[14:]
# Sign / Exponent
(sexp,) = struct.unpack("H", sexp)
signbit = sexp // 0x8000
expbits = sexp % 0x8000
# Signifand
try:
fracbits = int.from_bytes(sig, byteorder="little")
except AttributeError: # Py2
fracbits = int(sig[::-1].encode("hex"), 16)
if expbits == 0x0000 and fracbits == 0:
return Real(sympy.Float(0, 4965))
elif expbits == 0x7FFF:
if fracbits == 0:
return Expression("DirectedInfinity", Integer((-1) ** signbit))
else:
return Symbol("Indeterminate")
with mpmath.workprec(112):
core = mpmath.fdiv(fracbits, 2 ** 112)
if expbits == 0x000:
assert fracbits != 0
exp = -16382
core = mpmath.fmul((-1) ** signbit, core)
else:
assert 0x0001 <= expbits <= 0x7FFE
exp = expbits - 16383
core = mpmath.fmul((-1) ** signbit, mpmath.fadd(1, core))
if exp >= 0:
result = mpmath.fmul(core, 2 ** exp)
else:
result = mpmath.fdiv(core, 2 ** -exp)
return from_mpmath(result, dps(112))
@staticmethod
def _TerminatedString_reader(s):
"null-terminated string of 8-bit characters"
b = s.read(1)
contents = b""
while b != b"\x00":
if b == b"":
raise struct.error
contents += b
b = s.read(1)
return String(contents.decode("ascii"))
@staticmethod
def _UnsignedInteger8_reader(s):
"8-bit unsigned integer"
return Integer(*struct.unpack("B", s.read(1)))
@staticmethod
def _UnsignedInteger16_reader(s):
"16-bit unsigned integer"
return Integer(*struct.unpack("H", s.read(2)))
@staticmethod
def _UnsignedInteger24_reader(s):
"24-bit unsigned integer"
return Integer(*struct.unpack("I", s.read(3) + b"\0"))
@staticmethod
def _UnsignedInteger32_reader(s):
"32-bit unsigned integer"
return Integer(*struct.unpack("I", s.read(4)))
@staticmethod
def _UnsignedInteger64_reader(s):
"64-bit unsigned integer"
return Integer(*struct.unpack("Q", s.read(8)))
@staticmethod
def _UnsignedInteger128_reader(s):
"128-bit unsigned integer"
a, b = struct.unpack("QQ", s.read(16))
return Integer((b << 64) + a)
# Writer Functions
@staticmethod
def _Byte_writer(s, x):
"8-bit unsigned integer"
s.write(struct.pack("B", x))
@staticmethod
def _Character8_writer(s, x):
"8-bit character"
s.write(struct.pack("c", x.encode("ascii")))
# TODO
# @staticmethod
# def _Character16_writer(s, x):
# "16-bit character"
# pass
@staticmethod
def _Complex64_writer(s, x):
"IEEE single-precision complex number"
s.write(struct.pack("ff", x.real, x.imag))
# return _BinaryFormat._IEEE_cmplx(*struct.unpack('ff', s.read(8)))
@staticmethod
def _Complex128_writer(s, x):
"IEEE double-precision complex number"
s.write(struct.pack("dd", x.real, x.imag))
# TODO
# @staticmethod
# def _Complex256_writer(s, x):
# "IEEE quad-precision complex number"
# pass
@staticmethod
def _Integer8_writer(s, x):
"8-bit signed integer"
s.write(struct.pack("b", x))
@staticmethod
def _Integer16_writer(s, x):
"16-bit signed integer"
s.write(struct.pack("h", x))
@staticmethod
def _Integer24_writer(s, x):
"24-bit signed integer"
s.write(struct.pack("i", x << 8)[1:])
@staticmethod
def _Integer32_writer(s, x):
"32-bit signed integer"
s.write(struct.pack("i", x))
@staticmethod
def _Integer64_writer(s, x):
"64-bit signed integer"
s.write(struct.pack("q", x))
@staticmethod
def _Integer128_writer(s, x):
"128-bit signed integer"
a, b = x & 0xFFFFFFFFFFFFFFFF, x >> 64
s.write(struct.pack("Qq", a, b))
@staticmethod
def _Real32_writer(s, x):
"IEEE single-precision real number"
s.write(struct.pack("f", x))
@staticmethod
def _Real64_writer(s, x):
"IEEE double-precision real number"
s.write(struct.pack("d", x))
# TODO
# @staticmethod
# def _Real128_writer(s, x):
# "IEEE quad-precision real number"
# pass
@staticmethod
def _TerminatedString_writer(s, x):
"null-terminated string of 8-bit characters"
s.write(x.encode("utf-8"))
@staticmethod
def _UnsignedInteger8_writer(s, x):
"8-bit unsigned integer"
s.write(struct.pack("B", x))
@staticmethod
def _UnsignedInteger16_writer(s, x):
"16-bit unsigned integer"
s.write(struct.pack("H", x))
@staticmethod
def _UnsignedInteger24_writer(s, x):
"24-bit unsigned integer"
s.write(struct.pack("I", x << 8)[1:])
@staticmethod
def _UnsignedInteger32_writer(s, x):
"32-bit unsigned integer"
s.write(struct.pack("I", x))
@staticmethod
def _UnsignedInteger64_writer(s, x):
"64-bit unsigned integer"
s.write(struct.pack("Q", x))
@staticmethod
def _UnsignedInteger128_writer(s, x):
"128-bit unsigned integer"
a, b = x & 0xFFFFFFFFFFFFFFFF, x >> 64
s.write(struct.pack("QQ", a, b))
class BinaryWrite(Builtin):
"""
- 'BinaryWrite[$channel$, $b$]'
- writes a single byte given as an integer from 0 to 255.
- 'BinaryWrite[$channel$, {b1, b2, ...}]'
- writes a sequence of byte.
- 'BinaryWrite[$channel$, "string"]'
- writes the raw characters in a string.
- 'BinaryWrite[$channel$, $x$, $type$]'
- writes $x$ as the specified type.
- 'BinaryWrite[$channel$, {$x1$, $x2$, ...}, $type$]'
- writes a sequence of objects as the specified type.
- 'BinaryWrite[$channel$, {$x1$, $x2$, ...}, {$type1$, $type2$, ...}]'
- writes a sequence of objects using a sequence of specified types.
>> strm = OpenWrite[BinaryFormat -> True]
= OutputStream[...]
>> BinaryWrite[strm, {39, 4, 122}]
= OutputStream[...]
>> Close[strm]
= ...
>> strm = OpenRead[%, BinaryFormat -> True]
= InputStream[...]
>> BinaryRead[strm]
= 39
>> BinaryRead[strm, "Byte"]
= 4
>> BinaryRead[strm, "Character8"]
= z
>> Close[strm];
Write a String
>> strm = OpenWrite[BinaryFormat -> True]
= OutputStream[...]
>> BinaryWrite[strm, "abc123"]
= OutputStream[...]
>> Close[%]
= ...
Read as Bytes
>> strm = OpenRead[%, BinaryFormat -> True]
= InputStream[...]
>> BinaryRead[strm, {"Character8", "Character8", "Character8", "Character8", "Character8", "Character8", "Character8"}]
= {a, b, c, 1, 2, 3, EndOfFile}
>> Close[strm]
= ...
Read as Characters
>> strm = OpenRead[%, BinaryFormat -> True]
= InputStream[...]
>> BinaryRead[strm, {"Byte", "Byte", "Byte", "Byte", "Byte", "Byte", "Byte"}]
= {97, 98, 99, 49, 50, 51, EndOfFile}
>> Close[strm]
= ...
Write Type
>> strm = OpenWrite[BinaryFormat -> True]
= OutputStream[...]
>> BinaryWrite[strm, 97, "Byte"]
= OutputStream[...]
>> BinaryWrite[strm, {97, 98, 99}, {"Byte", "Byte", "Byte"}]
= OutputStream[...]
>> Close[%]
= ...
## Write then Read as Bytes
#> WRb[bytes_, form_] := Module[{stream, res={}, byte}, stream = OpenWrite[BinaryFormat -> True]; BinaryWrite[stream, bytes, form]; stream = OpenRead[Close[stream], BinaryFormat -> True]; While[Not[SameQ[byte = BinaryRead[stream], EndOfFile]], res = Join[res, {byte}];]; Close[stream]; res]
## Byte
#> WRb[{149, 2, 177, 132}, {"Byte", "Byte", "Byte", "Byte"}]
= {149, 2, 177, 132}
#> WRb[{149, 2, 177, 132}, {"Byte", "Byte", "Byte", "Byte"}]
= {149, 2, 177, 132}
#> (# == WRb[#, Table["Byte", {50}]]) & [RandomInteger[{0, 255}, 50]]
= True
## Character8
#> WRb[{"a", "b", "c"}, {"Character8", "Character8", "Character8"}]
= {97, 98, 99}
#> WRb[{34, 60, 39}, {"Character8", "Character8", "Character8"}]
= {51, 52, 54, 48, 51, 57}
#> WRb[{"ab", "c", "d"}, {"Character8", "Character8", "Character8", "Character8"}]
= {97, 98, 99, 100}
## Character16
## TODO
## Complex64
#> WRb[-6.36877988924*^28 + 3.434203392*^9 I, "Complex64"]
= {80, 201, 77, 239, 201, 177, 76, 79}
#> WRb[-6.98948862335*^24 + 1.52209021297*^23 I, "Complex64"]
= {158, 2, 185, 232, 18, 237, 0, 102}
#> WRb[-1.41079828148*^-19 - 0.013060791418 I, "Complex64"]
= {195, 142, 38, 160, 238, 252, 85, 188}
#> WRb[{5, -2054}, "Complex64"]
= {0, 0, 160, 64, 0, 0, 0, 0, 0, 96, 0, 197, 0, 0, 0, 0}
#> WRb[Infinity, "Complex64"]
= {0, 0, 128, 127, 0, 0, 0, 0}
#> WRb[-Infinity, "Complex64"]
= {0, 0, 128, 255, 0, 0, 0, 0}
#> WRb[DirectedInfinity[1 + I], "Complex64"]
= {0, 0, 128, 127, 0, 0, 128, 127}
#> WRb[DirectedInfinity[I], "Complex64"]
= {0, 0, 0, 0, 0, 0, 128, 127}
## FIXME (different convention to MMA)
#> WRb[Indeterminate, "Complex64"]
= {0, 0, 192, 127, 0, 0, 192, 127}
## Complex128
#> WRb[1.19839770357*^-235 - 2.64656391494*^-54 I,"Complex128"]
= {102, 217, 1, 163, 234, 98, 40, 15, 243, 104, 116, 15, 48, 57, 208, 180}
#> WRb[3.22170267142*^134 - 8.98364297498*^198 I,"Complex128"]
= {219, 161, 12, 126, 47, 94, 220, 91, 189, 66, 29, 68, 147, 11, 62, 233}
#> WRb[-Infinity, "Complex128"]
= {0, 0, 0, 0, 0, 0, 240, 255, 0, 0, 0, 0, 0, 0, 0, 0}
#> WRb[DirectedInfinity[1 - I], "Complex128"]
= {0, 0, 0, 0, 0, 0, 240, 127, 0, 0, 0, 0, 0, 0, 240, 255}
#> WRb[DirectedInfinity[I], "Complex128"]
= {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 240, 127}
## FIXME (different convention to MMA)
#> WRb[Indeterminate, "Complex128"]
= {0, 0, 0, 0, 0, 0, 248, 127, 0, 0, 0, 0, 0, 0, 248, 127}
## Complex256
## TODO
## Integer8
#> WRb[{5, 2, 11, -4}, {"Integer8", "Integer8", "Integer8", "Integer8"}]
= {5, 2, 11, 252}
#> WRb[{127, -128, 0}, {"Integer8", "Integer8", "Integer8"}]
= {127, 128, 0}
## Integer16
#> WRb[{661, -31567, 6256}, {"Integer16", "Integer16", "Integer16"}]
= {149, 2, 177, 132, 112, 24}
#> WRb[{0, 255, -1, 32640, -32640}, Table["Integer16", {5}]]
= {0, 0, 255, 0, 255, 255, 128, 127, 128, 128}
## Integer24
#> WRb[{-6247016, -6631492}, {"Integer24", "Integer24"}]
= {152, 173, 160, 188, 207, 154}
#> WRb[{-1593967, 1989169}, {"Integer24", "Integer24"}]
= {145, 173, 231, 49, 90, 30}
## Integer32
#> WRb[{-636001327, -236143729}, {"Integer32", "Integer32"}]
= {209, 99, 23, 218, 143, 187, 236, 241}
#> WRb[{2024611599, -1139645195}, {"Integer32", "Integer32"}]
= {15, 31, 173, 120, 245, 100, 18, 188}
## Integer64
#> WRb[{1176115612243989203}, "Integer64"]
= {211, 18, 152, 2, 235, 102, 82, 16}
#> WRb[{-8526737900550694619}, "Integer64"]
= {37, 217, 208, 88, 14, 241, 170, 137}
## Integer128
#> WRb[139827542997232652313568968616424513676, "Integer128"]
= {140, 32, 24, 199, 10, 169, 248, 117, 123, 184, 75, 76, 34, 206, 49, 105}
#> WRb[103439096823027953602112616165136677221, "Integer128"]
= {101, 57, 184, 108, 43, 214, 186, 120, 153, 51, 132, 225, 56, 165, 209, 77}
#> WRb[-49058912464625098822365387707690163087, "Integer128"]
= {113, 100, 125, 144, 211, 83, 140, 24, 206, 11, 198, 118, 222, 152, 23, 219}
## Real32
#> WRb[{8.398086656*^9, 1.63880017681*^16}, {"Real32", "Real32"}]
= {81, 72, 250, 79, 52, 227, 104, 90}
#> WRb[{5.6052915284*^32, 9.631141*^6}, {"Real32", "Real32"}]
= {251, 22, 221, 117, 165, 245, 18, 75}
#> WRb[Infinity, "Real32"]
= {0, 0, 128, 127}
#> WRb[-Infinity, "Real32"]
= {0, 0, 128, 255}
## FIXME (different convention to MMA)
#> WRb[Indeterminate, "Real32"]
= {0, 0, 192, 127}
## Real64
#> WRb[-5.14646619426*^227, "Real64"]
= {91, 233, 20, 87, 129, 185, 53, 239}
#> WRb[-9.69531698809*^20, "Real64"]
= {187, 67, 162, 67, 122, 71, 74, 196}
#> WRb[9.67355569764*^159, "Real64"]
= {132, 48, 80, 125, 157, 4, 38, 97}
#> WRb[Infinity, "Real64"]
= {0, 0, 0, 0, 0, 0, 240, 127}
#> WRb[-Infinity, "Real64"]
= {0, 0, 0, 0, 0, 0, 240, 255}
## FIXME (different convention to MMA)
#> WRb[Indeterminate, "Real64"]
= {0, 0, 0, 0, 0, 0, 248, 127}
## Real128
## TODO
## TerminatedString
#> WRb["abc", "TerminatedString"]
= {97, 98, 99, 0}
#> WRb[{"123", "456"}, {"TerminatedString", "TerminatedString", "TerminatedString"}]
= {49, 50, 51, 0, 52, 53, 54, 0}
#> WRb["", "TerminatedString"]
= {0}
## UnsignedInteger8
#> WRb[{96, 94, 141, 162, 141}, Table["UnsignedInteger8", {5}]]
= {96, 94, 141, 162, 141}
#> (#==WRb[#,Table["UnsignedInteger8",{50}]])&[RandomInteger[{0, 255}, 50]]
= True
## UnsignedInteger16
#> WRb[{18230, 47466, 9875, 59141}, Table["UnsignedInteger16", {4}]]
= {54, 71, 106, 185, 147, 38, 5, 231}
#> WRb[{0, 32896, 65535}, Table["UnsignedInteger16", {3}]]
= {0, 0, 128, 128, 255, 255}
## UnsignedInteger24
#> WRb[{14820174, 15488225}, Table["UnsignedInteger24", {2}]]
= {78, 35, 226, 225, 84, 236}
#> WRb[{5374629, 3889391}, Table["UnsignedInteger24", {2}]]
= {165, 2, 82, 239, 88, 59}
## UnsignedInteger32
#> WRb[{1885507541, 4157323149}, Table["UnsignedInteger32", {2}]]
= {213, 143, 98, 112, 141, 183, 203, 247}
#> WRb[{384206740, 1676316040}, Table["UnsignedInteger32", {2}]]
= {148, 135, 230, 22, 136, 141, 234, 99}
"""
messages = {
"writex": "`1`.",
}
writers = _BinaryFormat.get_writers()
def apply_notype(self, name, n, b, evaluation):
"BinaryWrite[OutputStream[name_, n_], b_]"
return self.apply(name, n, b, None, evaluation)
def apply(self, name, n, b, typ, evaluation):
"BinaryWrite[OutputStream[name_, n_], b_, typ_]"
channel = Expression("OutputStream", name, n)
# Check Empty Type
if typ is None:
expr = Expression("BinaryWrite", channel, b)
typ = Expression("List")
else:
expr = Expression("BinaryWrite", channel, b, typ)
# Check channel
stream = stream_manager.lookup_stream(n.get_int_value())
if stream is None or stream.io.closed:
evaluation.message("General", "openx", name)
return expr
if stream.mode not in ["wb", "ab"]:
evaluation.message("BinaryWrite", "openr", channel)
return expr
# Check b
if b.has_form("List", None):
pyb = b.leaves
else:
pyb = [b]
# Check Type
if typ.has_form("List", None):
types = typ.get_leaves()
else:
types = [typ]
if len(types) == 0: # Default type is "Bytes"
types = [String("Byte")]
types = [t.get_string_value() for t in types]
if not all(t in self.writers for t in types):
evaluation.message("BinaryRead", "format", typ)
return expr
# Write to stream
i = 0
while i < len(pyb):
x = pyb[i]
# Types are "repeated as many times as necessary"
t = types[i % len(types)]
# Coerce x
if t == "TerminatedString":
x = x.get_string_value() + "\x00"
elif t.startswith("Real"):
if isinstance(x, Real):
x = x.to_python()
elif x.has_form("DirectedInfinity", 1):
if x.leaves[0].get_int_value() == 1:
x = float("+inf")
elif x.leaves[0].get_int_value() == -1:
x = float("-inf")
else:
x = None
elif isinstance(x, Symbol) and x.get_name() == "System`Indeterminate":
x = float("nan")
else:
x = None
assert x is None or isinstance(x, float)
elif t.startswith("Complex"):
if isinstance(x, (Complex, Real, Integer)):
x = x.to_python()
elif x.has_form("DirectedInfinity", 1):
x = x.leaves[0].to_python(n_evaluation=evaluation)
# x*float('+inf') creates nan if x.real or x.imag are zero
x = complex(
x.real * float("+inf") if x.real != 0 else 0,
x.imag * float("+inf") if x.imag != 0 else 0,
)
elif isinstance(x, Symbol) and x.get_name() == "System`Indeterminate":
x = complex(float("nan"), float("nan"))
else:
x = None
elif t.startswith("Character"):
if isinstance(x, Integer):
x = [String(char) for char in str(x.get_int_value())]
pyb = list(chain(pyb[:i], x, pyb[i + 1 :]))
x = pyb[i]
if isinstance(x, String) and len(x.get_string_value()) > 1:
x = [String(char) for char in x.get_string_value()]
pyb = list(chain(pyb[:i], x, pyb[i + 1 :]))
x = pyb[i]
x = x.get_string_value()
elif t == "Byte" and isinstance(x, String):
if len(x.get_string_value()) > 1:
x = [String(char) for char in x.get_string_value()]
pyb = list(chain(pyb[:i], x, pyb[i + 1 :]))
x = pyb[i]
x = ord(x.get_string_value())
else:
x = x.get_int_value()
if x is None:
return evaluation.message("BinaryWrite", "nocoerce", b)
try:
self.writers[t](stream.io, x)
except struct.error:
return evaluation.message("BinaryWrite", "nocoerce", b)
i += 1
try:
stream.io.flush()
except IOError as err:
evaluation.message("BinaryWrite", "writex", err.strerror)
return channel
class BinaryRead(Builtin):
"""
- 'BinaryRead[$stream$]'
- reads one byte from the stream as an integer from 0 to 255.
- 'BinaryRead[$stream$, $type$]'
- reads one object of specified type from the stream.
- 'BinaryRead[$stream$, {$type1$, $type2$, ...}]'
- reads a sequence of objects of specified types.
>> strm = OpenWrite[BinaryFormat -> True]
= OutputStream[...]
>> BinaryWrite[strm, {97, 98, 99}]
= OutputStream[...]
>> Close[strm]
= ...
>> strm = OpenRead[%, BinaryFormat -> True]
= InputStream[...]
>> BinaryRead[strm, {"Character8", "Character8", "Character8"}]
= {a, b, c}
>> Close[strm];
## Write as Bytes then Read
#> WbR[bytes_, form_] := Module[{stream, res}, stream = OpenWrite[BinaryFormat -> True]; BinaryWrite[stream, bytes]; stream = OpenRead[Close[stream], BinaryFormat -> True]; res = BinaryRead[stream, form]; Close[stream]; res]
## Byte
#> WbR[{149, 2, 177, 132}, {"Byte", "Byte", "Byte", "Byte"}]
= {149, 2, 177, 132}
#> (# == WbR[#, Table["Byte", {50}]]) & [RandomInteger[{0, 255}, 50]]
= True
## Character8
#> WbR[{97, 98, 99}, {"Character8", "Character8", "Character8"}]
= {a, b, c}
#> WbR[{34, 60, 39}, {"Character8", "Character8", "Character8"}]
= {", <, '}
## Character16
#> WbR[{97, 0, 98, 0, 99, 0}, {"Character16", "Character16", "Character16"}]
= {a, b, c}
#> ToCharacterCode[WbR[{50, 154, 182, 236}, {"Character16", "Character16"}]]
= {{39474}, {60598}}
## #> WbR[ {91, 146, 206, 54}, {"Character16", "Character16"}]
## = {\\:925b, \\:36ce}
## Complex64
#> WbR[{80, 201, 77, 239, 201, 177, 76, 79}, "Complex64"] // InputForm
= -6.368779889243691*^28 + 3.434203392*^9*I
#> % // Precision
= MachinePrecision
#> WbR[{158, 2, 185, 232, 18, 237, 0, 102}, "Complex64"] // InputForm
= -6.989488623351118*^24 + 1.522090212973691*^23*I
#> WbR[{195, 142, 38, 160, 238, 252, 85, 188}, "Complex64"] // InputForm
= -1.4107982814807285*^-19 - 0.013060791417956352*I
## Complex128
#> WbR[{15,114,1,163,234,98,40,15,214,127,116,15,48,57,208,180},"Complex128"] // InputForm
= 1.1983977035653814*^-235 - 2.6465639149433955*^-54*I
#> WbR[{148,119,12,126,47,94,220,91,42,69,29,68,147,11,62,233},"Complex128"] // InputForm
= 3.2217026714156333*^134 - 8.98364297498066*^198*I
#> % // Precision
= MachinePrecision
#> WbR[{15,42,80,125,157,4,38,97, 0,0,0,0,0,0,240,255}, "Complex128"]
= -I Infinity
#> WbR[{15,42,80,125,157,4,38,97, 0,0,0,0,0,0,240,127}, "Complex128"]
= I Infinity
#> WbR[{15,42,80,125,157,4,38,97, 1,0,0,0,0,0,240,255}, "Complex128"]
= Indeterminate
#> WbR[{0,0,0,0,0,0,240,127, 15,42,80,125,157,4,38,97}, "Complex128"]
= Infinity
#> WbR[{0,0,0,0,0,0,240,255, 15,42,80,125,157,4,38,97}, "Complex128"]
= -Infinity
#> WbR[{1,0,0,0,0,0,240,255, 15,42,80,125,157,4,38,97}, "Complex128"]
= Indeterminate
#> WbR[{0,0,0,0,0,0,240,127, 0,0,0,0,0,0,240,127}, "Complex128"]
= Indeterminate
#> WbR[{0,0,0,0,0,0,240,127, 0,0,0,0,0,0,240,255}, "Complex128"]
= Indeterminate
## Complex256
## TODO
## Integer8
#> WbR[{149, 2, 177, 132}, {"Integer8", "Integer8", "Integer8", "Integer8"}]
= {-107, 2, -79, -124}
#> WbR[{127, 128, 0, 255}, {"Integer8", "Integer8", "Integer8", "Integer8"}]
= {127, -128, 0, -1}
## Integer16
#> WbR[{149, 2, 177, 132, 112, 24}, {"Integer16", "Integer16", "Integer16"}]
= {661, -31567, 6256}
#> WbR[{0, 0, 255, 0, 255, 255, 128, 127, 128, 128}, Table["Integer16", {5}]]
= {0, 255, -1, 32640, -32640}
## Integer24
#> WbR[{152, 173, 160, 188, 207, 154}, {"Integer24", "Integer24"}]
= {-6247016, -6631492}
#> WbR[{145, 173, 231, 49, 90, 30}, {"Integer24", "Integer24"}]
= {-1593967, 1989169}
## Integer32
#> WbR[{209, 99, 23, 218, 143, 187, 236, 241}, {"Integer32", "Integer32"}]
= {-636001327, -236143729}
#> WbR[{15, 31, 173, 120, 245, 100, 18, 188}, {"Integer32", "Integer32"}]
= {2024611599, -1139645195}
## Integer64
#> WbR[{211, 18, 152, 2, 235, 102, 82, 16}, "Integer64"]
= 1176115612243989203
#> WbR[{37, 217, 208, 88, 14, 241, 170, 137}, "Integer64"]
= -8526737900550694619
## Integer128
#> WbR[{140,32,24,199,10,169,248,117,123,184,75,76,34,206,49,105}, "Integer128"]
= 139827542997232652313568968616424513676
#> WbR[{101,57,184,108,43,214,186,120,153,51,132,225,56,165,209,77}, "Integer128"]
= 103439096823027953602112616165136677221
#> WbR[{113,100,125,144,211,83,140,24,206,11,198,118,222,152,23,219}, "Integer128"]
= -49058912464625098822365387707690163087
## Real32
#> WbR[{81, 72, 250, 79, 52, 227, 104, 90}, {"Real32", "Real32"}] // InputForm
= {8.398086656*^9, 1.6388001768669184*^16}
#> WbR[{251, 22, 221, 117, 165, 245, 18, 75}, {"Real32", "Real32"}] // InputForm
= {5.605291528399748*^32, 9.631141*^6}
#> WbR[{126, 82, 143, 43}, "Real32"] // InputForm
= 1.0183657302847982*^-12
#> % // Precision
= MachinePrecision
#> WbR[{0, 0, 128, 127}, "Real32"]
= Infinity
#> WbR[{0, 0, 128, 255}, "Real32"]
= -Infinity
#> WbR[{1, 0, 128, 255}, "Real32"]
= Indeterminate
#> WbR[{1, 0, 128, 127}, "Real32"]
= Indeterminate
## Real64
#> WbR[{45, 243, 20, 87, 129, 185, 53, 239}, "Real64"] // InputForm
= -5.146466194262116*^227
#> WbR[{192, 60, 162, 67, 122, 71, 74, 196}, "Real64"] // InputForm
= -9.695316988087658*^20
#> WbR[{15, 42, 80, 125, 157, 4, 38, 97}, "Real64"] // InputForm
= 9.67355569763742*^159
#> % // Precision
= MachinePrecision
#> WbR[{0, 0, 0, 0, 0, 0, 240, 127}, "Real64"]
= Infinity
#> WbR[{0, 0, 0, 0, 0, 0, 240, 255}, "Real64"]
= -Infinity
#> WbR[{1, 0, 0, 0, 0, 0, 240, 127}, "Real64"]
= Indeterminate
#> WbR[{1, 0, 0, 0, 0, 0, 240, 255}, "Real64"]
= Indeterminate
## Real128
## 0x0000
#> WbR[{0,0,0,0, 0,0,0,0, 0,0,0,0, 0,0,0,0}, "Real128"]
= 0.*^-4965
#> WbR[{0,0,0,0, 0,0,0,0, 0,0,0,0, 0,0,0,128}, "Real128"]
= 0.*^-4965
## 0x0001 - 0x7FFE
#> WbR[{0,0,0,0, 0,0,0,0, 0,0,0,0, 0,0,255,63}, "Real128"]
= 1.00000000000000000000000000000000
#> WbR[{0,0,0,0, 0,0,0,0, 0,0,0,0, 0,0,255,191}, "Real128"]
= -1.00000000000000000000000000000000
#> WbR[{135, 62, 233, 137, 22, 208, 233, 210, 133, 82, 251, 92, 220, 216, 255, 63}, "Real128"]
= 1.84711247573661489653389674493896
#> WbR[{135, 62, 233, 137, 22, 208, 233, 210, 133, 82, 251, 92, 220, 216, 207, 72}, "Real128"]
= 2.45563355727491021879689747166252*^679
#> WbR[{74, 95, 30, 234, 116, 130, 1, 84, 20, 133, 245, 221, 113, 110, 219, 212}, "Real128"]
= -4.52840681592341879518366539335138*^1607
#> % // Precision
= 33.
## 0x7FFF
#> WbR[{0,0,0,0, 0,0,0,0, 0,0,0,0, 0,0,255,127}, "Real128"]
= Infinity
#> WbR[{0,0,0,0, 0,0,0,0, 0,0,0,0, 0,0,255,255}, "Real128"]
= -Infinity
#> WbR[{1,0,0,0, 0,0,0,0, 0,0,0,0, 0,0,255,127}, "Real128"]
= Indeterminate
#> WbR[{1,0,0,0, 0,0,0,0, 0,0,0,0, 0,0,255,255}, "Real128"]
= Indeterminate
## TerminatedString
#> WbR[{97, 98, 99, 0}, "TerminatedString"]
= abc
#> WbR[{49, 50, 51, 0, 52, 53, 54, 0, 55, 56, 57}, Table["TerminatedString", {3}]]
= {123, 456, EndOfFile}
#> WbR[{0}, "TerminatedString"] // InputForm
= ""
## UnsignedInteger8
#> WbR[{96, 94, 141, 162, 141}, Table["UnsignedInteger8", {5}]]
= {96, 94, 141, 162, 141}
#> (#==WbR[#,Table["UnsignedInteger8",{50}]])&[RandomInteger[{0, 255}, 50]]
= True
## UnsignedInteger16
#> WbR[{54, 71, 106, 185, 147, 38, 5, 231}, Table["UnsignedInteger16", {4}]]
= {18230, 47466, 9875, 59141}
#> WbR[{0, 0, 128, 128, 255, 255}, Table["UnsignedInteger16", {3}]]
= {0, 32896, 65535}
## UnsignedInteger24
#> WbR[{78, 35, 226, 225, 84, 236}, Table["UnsignedInteger24", {2}]]
= {14820174, 15488225}
#> WbR[{165, 2, 82, 239, 88, 59}, Table["UnsignedInteger24", {2}]]
= {5374629, 3889391}
## UnsignedInteger32
#> WbR[{213,143,98,112,141,183,203,247}, Table["UnsignedInteger32", {2}]]
= {1885507541, 4157323149}
#> WbR[{148,135,230,22,136,141,234,99}, Table["UnsignedInteger32", {2}]]
= {384206740, 1676316040}
## UnsignedInteger64
#> WbR[{95, 5, 33, 229, 29, 62, 63, 98}, "UnsignedInteger64"]
= 7079445437368829279
#> WbR[{134, 9, 161, 91, 93, 195, 173, 74}, "UnsignedInteger64"]
= 5381171935514265990
## UnsignedInteger128
#> WbR[{108,78,217,150,88,126,152,101,231,134,176,140,118,81,183,220}, "UnsignedInteger128"]
= 293382001665435747348222619884289871468
#> WbR[{53,83,116,79,81,100,60,126,202,52,241,48,5,113,92,190}, "UnsignedInteger128"]
= 253033302833692126095975097811212718901
## EndOfFile
#> WbR[{148}, {"Integer32", "Integer32","Integer32"}]
= {EndOfFile, EndOfFile, EndOfFile}
"""
readers = _BinaryFormat.get_readers()
messages = {
"format": "`1` is not a recognized binary format.",
"openw": "`1` is open for output.",
"bfmt": "The stream `1` has been opened with BinaryFormat -> False and cannot be used with binary data.",
}
def apply_empty(self, name, n, evaluation):
"BinaryRead[InputStream[name_, n_]]"
return self.apply(name, n, None, evaluation)
def apply(self, name, n, typ, evaluation):
"BinaryRead[InputStream[name_, n_], typ_]"
channel = Expression("InputStream", name, n)
# Check typ
if typ is None:
expr = Expression("BinaryRead", channel)
typ = String("Byte")
else:
expr = Expression("BinaryRead", channel, typ)
# Check channel
stream = stream_manager.lookup_stream(n.get_int_value())
if stream is None or stream.io.closed:
evaluation.message("General", "openx", name)
return expr
if stream.mode not in ["rb"]:
evaluation.message("BinaryRead", "bfmt", channel)
return expr
if typ.has_form("List", None):
types = typ.get_leaves()
else:
types = [typ]
types = [t.get_string_value() for t in types]
if not all(t in self.readers for t in types):
evaluation.message("BinaryRead", "format", typ)
return expr
# Read from stream
result = []
for t in types:
try:
result.append(self.readers[t](stream.io))
except struct.error:
result.append(SymbolEndOfFile)
if typ.has_form("List", None):
return Expression("List", *result)
else:
if len(result) == 1:
return result[0]
class WriteString(Builtin):
"""
- 'WriteString[$stream$, $str1, $str2$, ... ]'
- writes the strings to the output stream.
>> stream = OpenWrite[];
>> WriteString[stream, "This is a test 1"]
>> WriteString[stream, "This is also a test 2"]
>> Close[stream]
= ...
>> FilePrint[%]
| This is a test 1This is also a test 2
>> stream = OpenWrite[];
>> WriteString[stream, "This is a test 1", "This is also a test 2"]
>> Close[stream]
= ...
>> FilePrint[%]
| This is a test 1This is also a test 2
#> stream = OpenWrite[];
#> WriteString[stream, 100, 1 + x + y, Sin[x + y]]
#> Close[stream]
= ...
#> FilePrint[%]
| 1001 + x + ySin[x + y]
#> stream = OpenWrite[];
#> WriteString[stream]
#> Close[stream]
= ...
#> FilePrint[%]
#> WriteString[%%, abc]
#> Streams[%%%][[1]]
= ...
#> Close[%]
= ...
#> FilePrint[%]
| abc
"""
messages = {
"strml": ("`1` is not a string, stream, " "or list of strings and streams."),
"writex": "`1`.",
}
attributes = "Protected"
def apply(self, channel, expr, evaluation):
"WriteString[channel_, expr___]"
strm = _channel_to_stream(channel, "w")
if strm is None:
return
stream = stream_manager.lookup_stream(strm.leaves[1].get_int_value())
if stream is None or stream.io is None or stream.io.closed:
return None
exprs = []
for expri in expr.get_sequence():
result = expri.format(evaluation, "System`OutputForm")
try:
result = result.boxes_to_text(evaluation=evaluation)
except BoxError:
return evaluation.message(
"General",
"notboxes",
Expression("FullForm", result).evaluate(evaluation),
)
exprs.append(result)
line = "".join(exprs)
if type(stream) is BytesIO:
line = line.encode("utf8")
stream.io.write(line)
try:
stream.io.flush()
except IOError as err:
evaluation.message("WriteString", "writex", err.strerror)
return SymbolNull
class _OpenAction(Builtin):
attributes = "Protected"
# BinaryFormat: 'False',
# CharacterEncoding :> Automatic,
# DOSTextFormat :> True,
# FormatType -> InputForm,
# NumberMarks :> $NumberMarks,
# PageHeight -> 22, PageWidth -> 78,
# TotalHeight -> Infinity,
# TotalWidth -> Infinity
options = {
"BinaryFormat": "False",
"CharacterEncoding": "$CharacterEncoding",
}
messages = {
"argx": "OpenRead called with 0 arguments; 1 argument is expected.",
"fstr": (
"File specification `1` is not a string of " "one or more characters."
),
}
def apply_empty(self, evaluation, options):
"%(name)s[OptionsPattern[]]"
if isinstance(self, (OpenWrite, OpenAppend)):
tmpf = tempfile.NamedTemporaryFile(dir=TMP_DIR)
path = String(tmpf.name)
tmpf.close()
return self.apply_path(path, evaluation, options)
else:
evaluation.message("OpenRead", "argx")
return
def apply_path(self, path, evaluation, options):
"%(name)s[path_?NotOptionQ, OptionsPattern[]]"
# Options
# BinaryFormat
mode = self.mode
if options["System`BinaryFormat"].is_true():
if not self.mode.endswith("b"):
mode += "b"
if not (isinstance(path, String) and len(path.to_python()) > 2):
evaluation.message(self.__class__.__name__, "fstr", path)
return
path_string = path.get_string_value()
tmp = path_search(path_string)
if tmp is None:
if mode in ["r", "rb"]:
evaluation.message("General", "noopen", path)
return
else:
path_string = tmp
try:
encoding = self.get_option(options, "CharacterEncoding", evaluation)
if not isinstance(encoding, String):
return
opener = mathics_open(
path_string, mode=mode, encoding=encoding.get_string_value()
)
opener.__enter__()
n = opener.n
except IOError:
evaluation.message("General", "noopen", path)
return
except MessageException as e:
e.message(evaluation)
return
return Expression(self.stream_type, path, Integer(n))
class OpenRead(_OpenAction):
"""
- 'OpenRead["file"]'
- opens a file and returns an InputStream.
>> OpenRead["ExampleData/EinsteinSzilLetter.txt"]
= InputStream[...]
#> Close[%];
S> OpenRead["https://raw.githubusercontent.com/mathics/Mathics/master/README.rst"]
= InputStream[...]
S> Close[%];
#> OpenRead[]
: OpenRead called with 0 arguments; 1 argument is expected.
= OpenRead[]
#> OpenRead[y]
: File specification y is not a string of one or more characters.
= OpenRead[y]
#> OpenRead[""]
: File specification is not a string of one or more characters.
= OpenRead[]
#> OpenRead["MathicsNonExampleFile"]
: Cannot open MathicsNonExampleFile.
= OpenRead[MathicsNonExampleFile]
#> OpenRead["ExampleData/EinsteinSzilLetter.txt", BinaryFormat -> True]
= InputStream[...]
#> Close[%];
"""
mode = "r"
stream_type = "InputStream"
class OpenWrite(_OpenAction):
"""
- 'OpenWrite["file"]'
- opens a file and returns an OutputStream.
>> OpenWrite[]
= OutputStream[...]
#> Close[%];
#> OpenWrite[BinaryFormat -> True]
= OutputStream[...]
#> Close[%];
"""
mode = "w"
stream_type = "OutputStream"
class OpenAppend(_OpenAction):
"""
- 'OpenAppend["file"]'
- opens a file and returns an OutputStream to which writes are appended.
>> OpenAppend[]
= OutputStream[...]
#> Close[%];
#> appendFile = OpenAppend["MathicsNonExampleFile"]
= OutputStream[MathicsNonExampleFile, ...]
#> Close[appendFile]
= MathicsNonExampleFile
#> DeleteFile["MathicsNonExampleFile"]
"""
mode = "a"
stream_type = "OutputStream"
class Get(PrefixOperator):
r"""
- '<<$name$'
- reads a file and evaluates each expression, returning only the last one.
- 'Get[$name$, Trace->True]'
- Runs Get tracing each line before it is evaluated.
S> filename = $TemporaryDirectory <> "/example_file";
S> Put[x + y, filename]
S> Get[filename]
= x + y
S> filename = $TemporaryDirectory <> "/example_file";
S> Put[x + y, 2x^2 + 4z!, Cos[x] + I Sin[x], filename]
S> Get[filename]
= Cos[x] + I Sin[x]
S> DeleteFile[filename]
## TODO: Requires EndPackage implemented
## 'Get' can also load packages:
## >> << "VectorAnalysis`"
#> Get["SomeTypoPackage`"]
: Cannot open SomeTypoPackage`.
= $Failed
## Parser Tests
#> Hold[<< ~/some_example/dir/] // FullForm
= Hold[Get["~/some_example/dir/"]]
#> Hold[<<`/.\-_:$*~?] // FullForm
= Hold[Get["`/.\\\\-_:$*~?"]]
"""
operator = "<<"
precedence = 720
attributes = "Protected"
options = {
"Trace": "False",
}
def apply(self, path, evaluation, options):
"Get[path_String, OptionsPattern[Get]]"
def check_options(options):
# Options
# TODO Proper error messages
result = {}
trace_get = evaluation.parse("Settings`$TraceGet")
if (
options["System`Trace"].to_python()
or trace_get.evaluate(evaluation) == SymbolTrue
):
import builtins
result["TraceFn"] = builtins.print
else:
result["TraceFn"] = None
return result
py_options = check_options(options)
trace_fn = py_options["TraceFn"]
result = None
pypath = path.get_string_value()
definitions = evaluation.definitions
mathics.core.streams.PATH_VAR = SymbolPath.evaluate(evaluation).to_python(string_quotes=False)
try:
if trace_fn:
trace_fn(pypath)
with mathics_open(pypath, "r") as f:
feeder = MathicsFileLineFeeder(f, trace_fn)
while not feeder.empty():
try:
query = parse(definitions, feeder)
except TranslateError:
return SymbolNull
finally:
feeder.send_messages(evaluation)
if query is None: # blank line / comment
continue
result = query.evaluate(evaluation)
except IOError:
evaluation.message("General", "noopen", path)
return SymbolFailed
except MessageException as e:
e.message(evaluation)
return SymbolFailed
return result
def apply_default(self, filename, evaluation):
"Get[filename_]"
expr = Expression("Get", filename)
evaluation.message("General", "stream", filename)
return expr
class Put(BinaryOperator):
"""
- '$expr$ >> $filename$'
- write $expr$ to a file.
- 'Put[$expr1$, $expr2$, ..., $filename$]'
- write a sequence of expressions to a file.
## Note a lot of these tests are:
## * a bit fragile, somewhat
## * somewhat OS dependent,
## * can leave crap in the filesystem
## * put in a pytest
##
## For these reasons this should be done a a pure test
## rather than intermingled with the doc system.
S> Put[40!, fortyfactorial]
: fortyfactorial is not string, InputStream[], or OutputStream[]
= 815915283247897734345611269596115894272000000000 >> fortyfactorial
## FIXME: final line should be
## = Put[815915283247897734345611269596115894272000000000, fortyfactorial]
S> filename = $TemporaryDirectory <> "/fortyfactorial";
S> Put[40!, filename]
S> FilePrint[filename]
| 815915283247897734345611269596115894272000000000
S> Get[filename]
= 815915283247897734345611269596115894272000000000
S> DeleteFile[filename]
S> filename = $TemporaryDirectory <> "/fiftyfactorial";
S> Put[10!, 20!, 30!, filename]
S> FilePrint[filename]
| 3628800
| 2432902008176640000
| 265252859812191058636308480000000
S> DeleteFile[filename]
=
S> filename = $TemporaryDirectory <> "/example_file";
S> Put[x + y, 2x^2 + 4z!, Cos[x] + I Sin[x], filename]
S> FilePrint[filename]
| x + y
| 2*x^2 + 4*z!
| Cos[x] + I*Sin[x]
S> DeleteFile[filename]
"""
operator = ">>"
precedence = 30
def apply(self, exprs, filename, evaluation):
"Put[exprs___, filename_String]"
instream = Expression("OpenWrite", filename).evaluate(evaluation)
if len(instream.leaves) == 2:
name, n = instream.leaves
else:
return # opening failed
result = self.apply_input(exprs, name, n, evaluation)
Expression("Close", instream).evaluate(evaluation)
return result
def apply_input(self, exprs, name, n, evaluation):
"Put[exprs___, OutputStream[name_, n_]]"
stream = stream_manager.lookup_stream(n.get_int_value())
if stream is None or stream.io.closed:
evaluation.message("Put", "openx", Expression("OutputSteam", name, n))
return
text = [
evaluation.format_output(Expression("InputForm", expr))
for expr in exprs.get_sequence()
]
text = "\n".join(text) + "\n"
text.encode("utf-8")
stream.io.write(text)
return SymbolNull
def apply_default(self, exprs, filename, evaluation):
"Put[exprs___, filename_]"
expr = Expression("Put", exprs, filename)
evaluation.message("General", "stream", filename)
return expr
class PutAppend(BinaryOperator):
"""
- '$expr$ >>> $filename$'
- append $expr$ to a file.
- 'PutAppend[$expr1$, $expr2$, ..., $"filename"$]'
- write a sequence of expressions to a file.
>> Put[50!, "factorials"]
>> FilePrint["factorials"]
| 30414093201713378043612608166064768844377641568960512000000000000
>> PutAppend[10!, 20!, 30!, "factorials"]
>> FilePrint["factorials"]
| 30414093201713378043612608166064768844377641568960512000000000000
| 3628800
| 2432902008176640000
| 265252859812191058636308480000000
>> 60! >>> "factorials"
>> FilePrint["factorials"]
| 30414093201713378043612608166064768844377641568960512000000000000
| 3628800
| 2432902008176640000
| 265252859812191058636308480000000
| 8320987112741390144276341183223364380754172606361245952449277696409600000000000000
>> "string" >>> factorials
>> FilePrint["factorials"]
| 30414093201713378043612608166064768844377641568960512000000000000
| 3628800
| 2432902008176640000
| 265252859812191058636308480000000
| 8320987112741390144276341183223364380754172606361245952449277696409600000000000000
| "string"
#> DeleteFile["factorials"];
## writing to dir
#> x >>> /var/
: Cannot open /var/.
= x >>> /var/
## writing to read only file
#> x >>> /proc/uptime
: Cannot open /proc/uptime.
= x >>> /proc/uptime
"""
operator = ">>>"
precedence = 30
attributes = "Protected"
def apply(self, exprs, filename, evaluation):
"PutAppend[exprs___, filename_String]"
instream = Expression("OpenAppend", filename).evaluate(evaluation)
if len(instream.leaves) == 2:
name, n = instream.leaves
else:
return # opening failed
result = self.apply_input(exprs, name, n, evaluation)
Expression("Close", instream).evaluate(evaluation)
return result
def apply_input(self, exprs, name, n, evaluation):
"PutAppend[exprs___, OutputStream[name_, n_]]"
stream = stream_manager.lookup_stream(n.get_int_value())
if stream is None or stream.io.closed:
evaluation.message("Put", "openx", Expression("OutputSteam", name, n))
return
text = [
str(e.do_format(evaluation, "System`OutputForm").__str__())
for e in exprs.get_sequence()
]
text = "\n".join(text) + "\n"
text.encode("ascii")
stream.io.write(text)
return SymbolNull
def apply_default(self, exprs, filename, evaluation):
"PutAppend[exprs___, filename_]"
expr = Expression("PutAppend", exprs, filename)
evaluation.message("General", "stream", filename)
return expr
class ReadList(Read):
"""
- 'ReadList["$file$"]'
- Reads all the expressions until the end of file.
- 'ReadList["$file$", $type$]'
- Reads objects of a specified type until the end of file.
- 'ReadList["$file$", {$type1$, $type2$, ...}]'
- Reads a sequence of specified types until the end of file.
>> ReadList[StringToStream["a 1 b 2"], {Word, Number}]
= {{a, 1}, {b, 2}}
>> stream = StringToStream["\\"abc123\\""];
>> ReadList[stream]
= {abc123}
>> InputForm[%]
= {"abc123"}
#> ReadList[stream, "Invalid"]
: Invalid is not a valid format specification.
= ReadList[..., Invalid]
#> Close[stream];
#> ReadList[StringToStream["a 1 b 2"], {Word, Number}, 1]
= {{a, 1}}
"""
# TODO
"""
#> ReadList[StringToStream["a 1 b 2"], {Word, Number}, -1]
: Non-negative machine-sized integer expected at position 3 in ReadList[InputStream[String, ...], {Word, Number}, -1].
= ReadList[InputStream[String, ...], {Word, Number}, -1]
"""
# TODO: Expression type
"""
#> ReadList[StringToStream["123 45 x y"], Expression]
= {5535 x y}
"""
# TODO: Accept newlines in input
"""
>> ReadList[StringToStream["123\nabc"]]
= {123, abc}
>> InputForm[%]
= {123, abc}
"""
rules = {
"ReadList[stream_]": "ReadList[stream, Expression]",
}
attributes = "Protected"
options = {
"NullRecords": "False",
"NullWords": "False",
"RecordSeparators": '{"\r\n", "\n", "\r"}',
"TokenWords": "{}",
"WordSeparators": '{" ", "\t"}',
}
def apply(self, channel, types, evaluation, options):
"ReadList[channel_, types_, OptionsPattern[ReadList]]"
# Options
# TODO: Implement extra options
# py_options = self.check_options(options)
# null_records = py_options['NullRecords']
# null_words = py_options['NullWords']
# record_separators = py_options['RecordSeparators']
# token_words = py_options['TokenWords']
# word_separators = py_options['WordSeparators']
result = []
while True:
tmp = super(ReadList, self).apply(channel, types, evaluation, options)
if tmp is None:
return
if tmp == SymbolFailed:
return
if tmp == SymbolEndOfFile:
break
result.append(tmp)
return from_python(result)
def apply_m(self, channel, types, m, evaluation, options):
"ReadList[channel_, types_, m_, OptionsPattern[ReadList]]"
# Options
# TODO: Implement extra options
# py_options = self.check_options(options)
# null_records = py_options['NullRecords']
# null_words = py_options['NullWords']
# record_separators = py_options['RecordSeparators']
# token_words = py_options['TokenWords']
# word_separators = py_options['WordSeparators']
py_m = m.get_int_value()
if py_m < 0:
evaluation.message(
"ReadList", "intnm", Expression("ReadList", channel, types, m)
)
return
result = []
for i in range(py_m):
tmp = super(ReadList, self).apply(channel, types, evaluation, options)
if tmp == SymbolFailed:
return
if tmp.to_python() == "EndOfFile":
break
result.append(tmp)
return from_python(result)
class FilePrint(Builtin):
"""
- 'FilePrint[$file$]'
- prints the raw contents of $file$.
#> exp = Sin[1];
#> FilePrint[exp]
: File specification Sin[1] is not a string of one or more characters.
= FilePrint[Sin[1]]
#> FilePrint["somenonexistantpath_h47sdmk^&h4"]
: Cannot open somenonexistantpath_h47sdmk^&h4.
= FilePrint[somenonexistantpath_h47sdmk^&h4]
#> FilePrint[""]
: File specification is not a string of one or more characters.
= FilePrint[]
"""
messages = {
"fstr": (
"File specification `1` is not a string of " "one or more characters."
),
}
options = {
"CharacterEncoding": "$CharacterEncoding",
"RecordSeparators": '{"\r\n", "\n", "\r"}',
"WordSeparators": '{" ", "\t"}',
}
attributes = "Protected"
def apply(self, path, evaluation, options):
"FilePrint[path_ OptionsPattern[FilePrint]]"
pypath = path.to_python()
if not (
isinstance(pypath, str)
and pypath[0] == pypath[-1] == '"'
and len(pypath) > 2
):
evaluation.message("FilePrint", "fstr", path)
return
pypath = path_search(pypath[1:-1])
# Options
record_separators = options["System`RecordSeparators"].to_python()
assert isinstance(record_separators, list)
assert all(
isinstance(s, str) and s[0] == s[-1] == '"' for s in record_separators
)
record_separators = [s[1:-1] for s in record_separators]
if pypath is None:
evaluation.message("General", "noopen", path)
return
if not osp.isfile(pypath):
return SymbolFailed
try:
with mathics_open(pypath, "r") as f:
result = f.read()
except IOError:
evaluation.message("General", "noopen", path)
return
except MessageException as e:
e.message(evaluation)
return
result = [result]
for sep in record_separators:
result = [item for res in result for item in res.split(sep)]
if result[-1] == "":
result = result[:-1]
for res in result:
evaluation.print_out(from_python(res))
return SymbolNull
class Close(Builtin):
"""
- 'Close[$stream$]'
- closes an input or output stream.
>> Close[StringToStream["123abc"]]
= String
>> Close[OpenWrite[]]
= ...
#> Streams[] == (Close[OpenWrite[]]; Streams[])
= True
#> Close["abc"]
: abc is not open.
= Close[abc]
#> strm = OpenWrite[];
#> Close[strm];
#> Quiet[Close[strm]]
= Close[OutputStream[...]]
"""
attributes = "Protected"
messages = {
"closex": "`1`.",
}
def apply(self, channel, evaluation):
"Close[channel_]"
if channel.has_form(("InputStream", "OutputStream"), 2):
[name, n] = channel.get_leaves()
stream = stream_manager.lookup_stream(n.get_int_value())
else:
stream = None
if stream is None or stream.io is None or stream.io.closed:
evaluation.message("General", "openx", channel)
return
stream.io.close()
return name
class StreamPosition(Builtin):
"""
- 'StreamPosition[$stream$]'
- returns the current position in a stream as an integer.
>> stream = StringToStream["Mathics is cool!"]
= ...
>> Read[stream, Word]
= Mathics
>> StreamPosition[stream]
= 7
"""
attributes = "Protected"
def apply_input(self, name, n, evaluation):
"StreamPosition[InputStream[name_, n_]]"
stream = stream_manager.lookup_stream(n.get_int_value())
if stream is None or stream.io is None or stream.io.closed:
evaluation.message("General", "openx", name)
return
return from_python(stream.io.tell())
def apply_output(self, name, n, evaluation):
"StreamPosition[OutputStream[name_, n_]]"
self.input_apply(name, n, evaluation)
def apply_default(self, stream, evaluation):
"StreamPosition[stream_]"
evaluation.message("General", "stream", stream)
return
class SetStreamPosition(Builtin):
"""
- 'SetStreamPosition[$stream$, $n$]'
- sets the current position in a stream.
>> stream = StringToStream["Mathics is cool!"]
= ...
>> SetStreamPosition[stream, 8]
= 8
>> Read[stream, Word]
= is
#> SetStreamPosition[stream, -5]
: Python2 cannot handle negative seeks.
= 10
>> SetStreamPosition[stream, Infinity]
= 16
"""
# TODO: Seeks beyond stream should return stmrng message
"""
#> SetStreamPosition[stream, 40]
= ERROR_MESSAGE_HERE
"""
messages = {
"int": "Integer expected at position 2 in `1`.",
"stmrng": (
"Cannot set the current point in stream `1` to position `2`. The "
"requested position exceeds the number of characters in the file"
),
"python2": "Python2 cannot handle negative seeks.", # FIXME: Python3?
}
attributes = "Protected"
def apply_input(self, name, n, m, evaluation):
"SetStreamPosition[InputStream[name_, n_], m_]"
stream = stream_manager.lookup_stream(n.get_int_value())
if stream is None or stream.io is None or stream.io.closed:
evaluation.message("General", "openx", name)
return
if not stream.io.seekable:
raise NotImplementedError
seekpos = m.to_python()
if not (isinstance(seekpos, int) or seekpos == float("inf")):
evaluation.message(
"SetStreamPosition", "stmrng", Expression("InputStream", name, n), m
)
return
try:
if seekpos == float("inf"):
stream.io.seek(0, 2)
else:
if seekpos < 0:
stream.io.seek(seekpos, 2)
else:
stream.io.seek(seekpos)
except IOError:
evaluation.message("SetStreamPosition", "python2")
return from_python(stream.io.tell())
def apply_output(self, name, n, m, evaluation):
"SetStreamPosition[OutputStream[name_, n_], m_]"
return self.apply_input(name, n, m, evaluation)
def apply_default(self, stream, evaluation):
"SetStreamPosition[stream_]"
evaluation.message("General", "stream", stream)
return
class Skip(Read):
"""
- 'Skip[$stream$, $type$]'
- skips ahead in an input steream by one object of the specified $type$.
- 'Skip[$stream$, $type$, $n$]'
- skips ahead in an input steream by $n$ objects of the specified $type$.
>> stream = StringToStream["a b c d"];
>> Read[stream, Word]
= a
>> Skip[stream, Word]
>> Read[stream, Word]
= c
#> Close[stream];
>> stream = StringToStream["a b c d"];
>> Read[stream, Word]
= a
>> Skip[stream, Word, 2]
>> Read[stream, Word]
= d
#> Skip[stream, Word]
= EndOfFile
#> Close[stream];
"""
rules = {
"Skip[InputStream[name_, n_], types_]": "Skip[InputStream[name, n], types, 1]",
}
messages = {
"intm": "Non-negative machine-sized integer expected at position 3 in `1`",
}
options = {
"AnchoredSearch": "False",
"IgnoreCase": "False",
"WordSearch": "False",
"RecordSeparators": '{"\r\n", "\n", "\r"}',
"WordSeparators": '{" ", "\t"}',
}
attributes = "Protected"
def apply(self, name, n, types, m, evaluation, options):
"Skip[InputStream[name_, n_], types_, m_, OptionsPattern[Skip]]"
channel = Expression("InputStream", name, n)
# Options
# TODO Implement extra options
# py_options = self.check_options(options)
# null_records = py_options['NullRecords']
# null_words = py_options['NullWords']
# record_separators = py_options['RecordSeparators']
# token_words = py_options['TokenWords']
# word_separators = py_options['WordSeparators']
py_m = m.to_python()
if not (isinstance(py_m, int) and py_m > 0):
evaluation.message(
"Skip",
"intm",
Expression("Skip", Expression("InputStream", name, n), types, m),
)
return
for i in range(py_m):
result = super(Skip, self).apply(channel, types, evaluation, options)
if result == SymbolEndOfFile:
return result
return SymbolNull
class Find(Read):
"""
- 'Find[$stream$, $text$]'
- find the first line in $stream$ that contains $text$.
>> stream = OpenRead["ExampleData/EinsteinSzilLetter.txt"];
>> Find[stream, "uranium"]
= in manuscript, leads me to expect that the element uranium may be turned into
>> Find[stream, "uranium"]
= become possible to set up a nuclear chain reaction in a large mass of uranium,
>> Close[stream]
= ...
>> stream = OpenRead["ExampleData/EinsteinSzilLetter.txt"];
>> Find[stream, {"energy", "power"} ]
= a new and important source of energy in the immediate future. Certain aspects
>> Find[stream, {"energy", "power"} ]
= by which vast amounts of power and large quantities of new radium-like
>> Close[stream]
= ...
"""
attributes = "Protected"
options = {
"AnchoredSearch": "False",
"IgnoreCase": "False",
"WordSearch": "False",
"RecordSeparators": '{"\r\n", "\n", "\r"}',
"WordSeparators": '{" ", "\t"}',
}
def apply(self, name, n, text, evaluation, options):
"Find[InputStream[name_, n_], text_, OptionsPattern[Find]]"
# Options
# TODO Implement extra options
# py_options = self.check_options(options)
# anchored_search = py_options['AnchoredSearch']
# ignore_case = py_options['IgnoreCase']
# word_search = py_options['WordSearch']
# record_separators = py_options['RecordSeparators']
# word_separators = py_options['WordSeparators']
py_text = text.to_python()
channel = Expression("InputStream", name, n)
if not isinstance(py_text, list):
py_text = [py_text]
if not all(isinstance(t, str) and t[0] == t[-1] == '"' for t in py_text):
evaluation.message("Find", "unknown", Expression("Find", channel, text))
return
py_text = [t[1:-1] for t in py_text]
while True:
tmp = super(Find, self).apply(
channel, Symbol("Record"), evaluation, options
)
py_tmp = tmp.to_python()[1:-1]
if py_tmp == "System`EndOfFile":
evaluation.message(
"Find", "notfound", Expression("Find", channel, text)
)
return SymbolFailed
for t in py_text:
if py_tmp.find(t) != -1:
return from_python(py_tmp)
class InputStream(Builtin):
"""
- 'InputStream[$name$, $n$]'
- represents an input stream.
>> stream = StringToStream["Mathics is cool!"]
= ...
>> Close[stream]
= String
"""
attributes = "Protected"
def apply(self, name, n, evaluation):
"InputStream[name_, n_]"
return
class OutputStream(Builtin):
"""
- 'OutputStream[$name$, $n$]'
- represents an output stream.
>> OpenWrite[]
= ...
>> Close[%]
= ...
"""
attributes = "Protected"
def apply(self, name, n, evaluation):
"OutputStream[name_, n_]"
return
class StringToStream(Builtin):
"""
- 'StringToStream[$string$]'
- converts a $string$ to an open input stream.
>> strm = StringToStream["abc 123"]
= InputStream[String, ...]
#> Read[strm, Word]
= abc
#> Read[strm, Number]
= 123
#> Close[strm]
= String
"""
attributes = "Protected"
def apply(self, string, evaluation):
"StringToStream[string_]"
pystring = string.to_python()[1:-1]
fp = io.StringIO(str(pystring))
name = Symbol("String")
stream = stream_manager.add(pystring, io=fp)
return Expression("InputStream", name, Integer(stream.n))
class Streams(Builtin):
"""
- 'Streams[]'
- returns a list of all open streams.
>> Streams[]
= ...
>> Streams["stdout"]
= ...
#> OpenWrite[]
= ...
#> Streams[%[[1]]]
= {OutputStream[...]}
#> Streams["some_nonexistant_name"]
= {}
"""
attributes = "Protected"
def apply(self, evaluation):
"Streams[]"
return self.apply_name(None, evaluation)
def apply_name(self, name, evaluation):
"Streams[name_String]"
result = []
for stream in stream_manager.STREAMS.values():
if stream is None or stream.io.closed:
continue
if isinstance(stream.io, io.StringIO):
head = "InputStream"
_name = Symbol("String")
else:
mode = stream.mode
if mode in ["r", "rb"]:
head = "InputStream"
elif mode in ["w", "a", "wb", "ab"]:
head = "OutputStream"
else:
raise ValueError("Unknown mode {0}".format(mode))
_name = String(stream.name)
expr = Expression(head, _name, Integer(stream.n))
if name is None or _name == name:
result.append(expr)
return Expression("List", *result)