1""" 2Helper functions and data structures 3""" 4import functools 5import re 6from contextlib import contextmanager 7from importlib import import_module 8 9 10def camel_to_snake_case(name): # type: (str) -> str 11 """Takes a camelCased string and converts to snake_case.""" 12 pattern = r"[A-Z][a-z]+|[A-Z]+(?![a-z])" 13 return "_".join(map(str.lower, re.findall(pattern, name))) 14 15 16def is_paired(text, open="(", close=")"): # type: (str, str, str) -> bool 17 """Check if the text only contains: 18 1. blackslash escaped parentheses, or 19 2. parentheses paired. 20 """ 21 count = 0 22 escape = False 23 for c in text: 24 if escape: 25 escape = False 26 elif c == "\\": 27 escape = True 28 elif c == open: 29 count += 1 30 elif c == close: 31 if count == 0: 32 return False 33 count -= 1 34 return count == 0 35 36 37def _preprocess_text(text): # type: (str) -> str 38 return text.replace("\r\n", "\n") 39 40 41class Source: 42 """Wrapper class on content to be parsed""" 43 44 def __init__(self, text): # type: (str) -> None 45 self._buffer = _preprocess_text(text) 46 self.pos = 0 47 self._anchor = 0 48 self._states = [] # type: List[BlockElement] 49 self.match = None # type: Optional[Match] 50 51 @property 52 def state(self): # type: () -> BlockElement 53 """Returns the current element state.""" 54 if not self._states: 55 raise RuntimeError("Need to push a state first.") 56 return self._states[-1] 57 58 @property 59 def root(self): # type: () -> BlockElement 60 """Returns the root element, which is at the bottom of self._states.""" 61 if not self._states: 62 raise RuntimeError("Need to push a state first.") 63 return self._states[0] 64 65 def push_state(self, element): # type: (BlockElement) -> None 66 """Push a new state to the state stack.""" 67 self._states.append(element) 68 69 def pop_state(self): # type: () -> BlockElement 70 """Pop the top most state.""" 71 return self._states.pop() 72 73 @contextmanager 74 def under_state(self, element): 75 # type: (BlockElement) -> Generator[Source, None, None] 76 """A context manager to enable a new state temporarily.""" 77 self.push_state(element) 78 yield self 79 self.pop_state() 80 81 @property 82 def exhausted(self): # type: () -> bool 83 """Indicates whether the source reaches the end.""" 84 return self.pos >= len(self._buffer) 85 86 @property 87 def prefix(self): # type: () -> str 88 """The prefix of each line when parsing.""" 89 return "".join(s._prefix for s in self._states) 90 91 def _expect_re(self, regexp, pos): 92 # type: (Union[Pattern, str], int) -> Optional[Match] 93 if isinstance(regexp, str): 94 regexp = re.compile(regexp) 95 return regexp.match(self._buffer, pos) 96 97 @staticmethod 98 @functools.lru_cache() 99 def match_prefix(prefix, line): # type: (str, str) -> int 100 """Check if the line starts with given prefix and 101 return the position of the end of prefix. 102 If the prefix is not matched, return -1. 103 """ 104 m = re.match(prefix, line.expandtabs(4)) 105 if not m: 106 if re.match(prefix, line.expandtabs(4).replace("\n", " " * 99 + "\n")): 107 return len(line) - 1 108 return -1 109 pos = m.end() 110 if pos == 0: 111 return 0 112 for i in range(1, len(line) + 1): 113 if len(line[:i].expandtabs(4)) >= pos: 114 return i 115 return -1 # pragma: no cover 116 117 def expect_re(self, regexp): # type: (Union[Pattern, str]) -> Optional[Match] 118 """Test against the given regular expression and returns the match object. 119 120 :param regexp: the expression to be tested. 121 :returns: the match object. 122 """ 123 prefix_len = self.match_prefix( 124 self.prefix, self.next_line(require_prefix=False) # type: ignore 125 ) 126 if prefix_len >= 0: 127 match = self._expect_re(regexp, self.pos + prefix_len) 128 self.match = match 129 return match 130 else: 131 return None 132 133 def next_line(self, require_prefix=True): # type: (bool) -> Optional[str] 134 """Return the next line in the source. 135 136 :param require_prefix: if False, the whole line will be returned. 137 otherwise, return the line with prefix stripped or None if the prefix 138 is not matched. 139 """ 140 if require_prefix: 141 m = self.expect_re(r"(?m)[^\n]*?$\n?") 142 else: 143 m = self._expect_re(r"(?m)[^\n]*$\n?", self.pos) 144 self.match = m 145 if m: 146 return m.group() 147 return None 148 149 def consume(self): # type: () -> None 150 """Consume the body of source. ``pos`` will move forward.""" 151 if self.match: 152 self.pos = self.match.end() 153 if self.match.group()[-1:] == "\n": 154 self._update_prefix() 155 self.match = None 156 157 def anchor(self): # type: () -> None 158 """Pin the current parsing position.""" 159 self._anchor = self.pos 160 161 def reset(self): # type: () -> None 162 """Reset the position to the last anchor.""" 163 self.pos = self._anchor 164 165 def _update_prefix(self): # type: () -> None 166 for s in self._states: 167 if hasattr(s, "_second_prefix"): 168 s._prefix = s._second_prefix # type: ignore 169 170 171def normalize_label(label): # type: (str) -> str 172 """Return the normalized form of link label.""" 173 return re.sub(r"\s+", " ", label).strip().casefold() 174 175 176def load_extension_object(name): 177 """Load extension object from a string. 178 First try `marko.ext.<name>` if possible 179 """ 180 module = None 181 if "." not in name: 182 try: 183 module = import_module(f"marko.ext.{name}") 184 except ImportError: 185 pass 186 if module is None: 187 try: 188 module = import_module(name) 189 except ImportError: 190 raise ImportError( 191 f"Extension {name} cannot be found. Please check the name." 192 ) 193 194 try: 195 maker = getattr(module, "make_extension") 196 except AttributeError: 197 raise AttributeError( 198 f"Module {name} does not have 'make_extension' attributte." 199 ) 200 return maker 201 202 203def is_type_check() -> bool: # pragma: no cover 204 try: 205 from typing import TYPE_CHECKING 206 except ImportError: 207 return False 208 else: 209 return TYPE_CHECKING 210 211 212if is_type_check(): 213 from .block import BlockElement 214 from typing import Optional, List, Generator, Union, Pattern, Match 215