1"""Runtime support for jsparagus-generated parsers.""" 2 3# Nt is unused here, but we re-export it. 4from .grammar import Nt, InitNt, End 5from .lexer import UnexpectedEndError 6import collections 7from dataclasses import dataclass 8 9 10__all__ = ['ACCEPT', 'ERROR', 'Nt', 'InitNt', 'End', 'Parser', 'ErrorToken'] 11 12# Actions are encoded as 64-bit signed integers, with the following meanings: 13# - n in range(0, 0x8000_0000_0000_0000) - shift to state n 14# - n in range(0x8000_0000_0000_0000, 0xc000_0000_0000_0000) - call special_case(n & SPECIAL_CASE_MASK) 15# - n == ERROR (0xbfff_ffff_ffff_fffe) 16# - n == ACCEPT (0xbfff_ffff_ffff_ffff) 17# - n in range(0xc000_0000_0000_0000, 0x1_0000_0000_0000_0000) - reduce by production -n - 1 18 19SPECIAL_CASE_MASK = 0x3fff_ffff_ffff_ffff 20SPECIAL_CASE_TAG = -0x8000_0000_0000_0000 21ACCEPT = 0x_bfff_ffff_ffff_ffff - (1 << 64) 22ERROR = ACCEPT - 1 23 24 25@dataclass(frozen=True) 26class ErrorTokenClass: 27 def __repr__(self): 28 return 'ErrorToken' 29 30 31ErrorToken = ErrorTokenClass() 32 33 34def throw_syntax_error(actions, state, t, tokens): 35 assert t is not None 36 if isinstance(state, StateTermValue): 37 state = state.state 38 expected = set(actions[state].keys()) 39 expected = set(e for e in expected if not isinstance(e, Nt)) 40 41 # Tidy up the `expected` set a bit. 42 if End() in expected: 43 expected.remove(End()) 44 expected.add("end of input") 45 if ErrorToken in expected: 46 # This is possible because we restore the stack in _try_error_handling 47 # after reducing and then failing to find a recovery rule after all. 48 # But don't tell people in error messages that an error is one of the 49 # things we expect. It makes no sense. 50 expected.remove(ErrorToken) 51 52 if len(expected) < 2: 53 tokens.throw("expected {!r}, got {!r}".format(list(expected)[0], t)) 54 else: 55 tokens.throw("expected one of {!r}, got {!r}" 56 .format(sorted(expected), t)) 57 58 59StateTermValue = collections.namedtuple("StateTermValue", "state term value new_line") 60 61 62class ShiftError(Exception): 63 pass 64 65 66class ShiftAccept(Exception): 67 pass 68 69 70class Parser: 71 """Parser using jsparagus-generated tables. 72 73 The usual design is, a parser object consumes a token iterator. 74 This Parser is not like that. Instead, the lexer feeds tokens to it 75 by calling `parser.write_terminal(lexer, token)` repeatedly, then 76 `parser.close(lexer)`. 77 78 The parser uses these methods of the lexer object: 79 80 * lexer.take() - Return data associated with a token, like the 81 numeric value of an int literal token. 82 83 * lexer.throw(message) - Throw a syntax error. (This is on the lexer 84 because the lexer has the current position.) 85 86 * lexer.throw_unexpected_end() - Throw a syntax error after we 87 successfully parsed the whole file except more tokens were expected at 88 the end. 89 90 """ 91 92 def __init__(self, actions, error_codes, entry_state, methods): 93 self.actions = actions 94 self.stack = [StateTermValue(entry_state, None, None, False)] 95 self.replay = [] 96 self.flags = collections.defaultdict(lambda: []) 97 self.error_codes = error_codes 98 self.methods = methods 99 self.closed = False 100 self.debug = False 101 self.is_simulator = False 102 self.last_shift = None 103 104 def clone(self): 105 return Parser(self.actions, self.error_codes, 0, self.methods) 106 107 def simulator_clone(self): 108 """Make a copy of this parser for simulation. 109 110 The copy has a version of the self.reductions table that never actually 111 does anything. 112 113 This is absurdly expensive and is for very odd and special use cases. 114 """ 115 p = self.clone() 116 p.stack = self.stack[:] 117 p.replay = self.replay[:] 118 p.debug = self.debug 119 p.is_simulator = True 120 return p 121 122 def _str_stv(self, stv): 123 # NOTE: replace this function by repr(), to inspect wrong computations. 124 val = '' 125 if stv.value: 126 val = '*' 127 return "-- {} {}--> {}".format(stv.term, val, stv.state) 128 129 def _dbg_where(self, t=""): 130 name = "stack" 131 if self.is_simulator: 132 name = "simulator" 133 print("{}: {}; {}\nexpect one of: {}".format( 134 name, 135 " ".join(self._str_stv(s) for s in self.stack), t, 136 repr(self.actions[self.stack[-1].state]) 137 )) 138 139 def _shift(self, stv, lexer): 140 state = self.stack[-1].state 141 if self.debug: 142 self._dbg_where("shift: {}".format(str(stv.term))) 143 if not isinstance(self.actions[state], dict): 144 # This happens after raising a ShiftAccept error. 145 if stv.term == End(): 146 raise ShiftAccept() 147 raise ShiftError() 148 self.last_shift = (state, stv) 149 while True: 150 goto = self.actions[state].get(stv.term, ERROR) 151 if goto == ERROR: 152 if self.debug: 153 self._dbg_where("(error)") 154 self._try_error_handling(lexer, stv) 155 stv = self.replay.pop() 156 if self.debug: 157 self._dbg_where("error: {}".format(str(stv.term))) 158 continue 159 state = goto 160 self.stack.append(StateTermValue(state, stv.term, stv.value, stv.new_line)) 161 action = self.actions[state] 162 if not isinstance(action, dict): # Action 163 if self.debug: 164 self._dbg_where("(action {})".format(state)) 165 action(self, lexer) 166 state = self.stack[-1].state 167 action = self.actions[state] 168 # Actions should always unwind or do an epsilon transition to a 169 # shift state. 170 assert isinstance(action, dict) 171 if self.replay != []: 172 stv = self.replay.pop() 173 if self.debug: 174 self._dbg_where("replay: {}".format(repr(stv.term))) 175 else: 176 break 177 178 def replay_action(self, dest): 179 # This code emulates the code which would be executed by the shift 180 # function, if we were to return to this shift function instead of 181 # staying within the action functions. The destination provided as 182 # argument should match the content of the parse table, otherwise this 183 # would imply that the replay action does not encode a transition from 184 # the parse table. 185 state = self.stack[-1].state 186 stv = self.replay.pop() 187 if self.debug: 188 self._dbg_where("(inline-replay: {})".format(repr(stv.term))) 189 goto = self.actions[state].get(stv.term, ERROR) 190 assert goto == dest 191 self.stack.append(StateTermValue(dest, stv.term, stv.value, stv.new_line)) 192 193 def shift_list(self, stv_list, lexer): 194 self.replay.extend(reversed(stv_list)) 195 196 def write_terminal(self, lexer, t): 197 assert not self.closed 198 try: 199 stv = StateTermValue(0, t, lexer.take(), lexer.saw_line_terminator()) 200 self._shift(stv, lexer) 201 except ShiftAccept: 202 if self.debug: 203 self._dbg_where("(write_terminal accept)") 204 if self.replay != []: 205 state, stv = self.last_shift 206 throw_syntax_error(self.actions, state, lexer.take(), lexer) 207 except ShiftError: 208 state, stv = self.last_shift 209 throw_syntax_error(self.actions, state, lexer.take(), lexer) 210 211 def close(self, lexer): 212 assert not self.closed 213 self.closed = True 214 try: 215 self._shift(StateTermValue(0, End(), End(), False), lexer) 216 except ShiftAccept: 217 if self.debug: 218 self._dbg_where("(close accept)") 219 print(repr(self.stack)) 220 while self.stack[-1].term == End(): 221 self.stack.pop() 222 assert len(self.stack) == 2 223 assert self.stack[0].term is None 224 assert isinstance(self.stack[1].term, Nt) 225 return self.stack[1].value 226 227 def top_state(self): 228 return self.stack[-1].state 229 230 def check_not_on_new_line(self, lexer, peek): 231 if peek <= 0: 232 raise ValueError("check_not_on_new_line got an impossible peek offset") 233 if not self.stack[-peek].new_line: 234 return True 235 for _ in range(peek - 1): 236 self.replay.append(self.stack.pop()) 237 stv = self.stack.pop() 238 self._try_error_handling(lexer, stv) 239 return False 240 241 def _try_error_handling(self, lexer, stv): 242 # Error recovery version of the code in write_terminal. Three differences 243 # between this and write_terminal are commented below. 244 if stv.term is ErrorToken: 245 if stv.value == End(): 246 lexer.throw_unexpected_end() 247 raise 248 throw_syntax_error(self.actions, self.stack[-1], stv.value, lexer) 249 raise 250 251 state = self.stack[-1].state 252 error_code = self.error_codes[state] 253 if error_code is not None: 254 self.on_recover(error_code, lexer, stv) 255 self.replay.append(stv) 256 self.replay.append(StateTermValue(0, ErrorToken, stv.value, stv.new_line)) 257 elif stv.term == End(): 258 lexer.throw_unexpected_end() 259 raise 260 else: 261 throw_syntax_error(self.actions, self.stack[-1], stv.value, lexer) 262 raise 263 264 def on_recover(self, error_code, lexer, stv): 265 """Called when the grammar says to recover from a parse error. 266 267 Subclasses can override this to add custom code when an ErrorSymbol in 268 a production is matched. This base-class implementation does nothing, 269 allowing the parser to recover from the error silently. 270 """ 271 pass 272 273 def can_accept_terminal(self, lexer, t): 274 """Return True if the terminal `t` is OK next. 275 276 False if it's an error. `t` can be None, querying if we can accept 277 end-of-input. 278 """ 279 class BogusLexer: 280 def throw_unexpected_end(self): 281 raise UnexpectedEndError("") 282 283 def throw(self, message): 284 raise SyntaxError(message) 285 286 def take(self): 287 return str(t) 288 289 def saw_line_terminator(self): 290 return lexer.saw_line_terminator() 291 292 sim = self.simulator_clone() 293 try: 294 sim.write_terminal(BogusLexer(), t) 295 except Exception: 296 return False 297 return True 298 299 def can_close(self): 300 """Return True if self.close() would succeed.""" 301 302 # The easy case: no error, parsing just succeeds. 303 # The hard case: maybe error-handling would succeed? 304 # The easiest thing is simply to run the method. 305 class BogusLexer: 306 def throw_unexpected_end(self): 307 raise UnexpectedEndError("") 308 309 def throw(self, message): 310 raise SyntaxError(message) 311 312 sim = self.simulator_clone() 313 try: 314 sim.close(BogusLexer()) 315 except SyntaxError: 316 return False 317 return True 318