1# Copyright 2004-2005 Joe Wreschnig, Michael Urman 2# 2016 Ryan Dellenbaugh 3# 2017 Nick Boultbee 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9 10import codecs 11import re 12 13from . import _match as match 14from ._match import ParseError 15from quodlibet.util import re_escape 16 17 18# Precompiled regexes 19TAG = re.compile(r'[~\w\s:]+') 20UNARY_OPERATOR = re.compile(r'-') 21BINARY_OPERATOR = re.compile(r'[+\-\*/]') 22RELATIONAL_OPERATOR = re.compile(r'>=|<=|==|!=|>|<|=') 23DIGITS = re.compile(r'\d+(\.\d+)?') 24WORD = re.compile(r'[ \w]+') 25REGEXP = re.compile(r'([^/\\]|\\.)*') 26SINGLE_STRING = re.compile(r"([^'\\]|\\.)*") 27DOUBLE_STRING = re.compile(r'([^"\\]|\\.)*') 28MODIFIERS = re.compile(r'[cisld]*') 29TEXT = re.compile(r'[^,)]+') 30DATE = re.compile(r'\d{4}(-\d{1,2}(-\d{1,2})?)?') 31 32 33class QueryParser(object): 34 """Parse the input. One lookahead token, start symbol is Query.""" 35 36 def __init__(self, tokens, star=[]): 37 self.tokens = tokens 38 self.index = 0 39 self.last_match = None 40 self.star = star 41 42 def space(self): 43 """Advance to the first non-space token""" 44 while not self.eof() and self.tokens[self.index] == ' ': 45 self.index += 1 46 47 def accept(self, token): 48 """Return whether the next token is the same as the provided token, 49 and if so advance past it.""" 50 self.space() 51 if self.eof(): 52 return False 53 if self.tokens[self.index] == token: 54 self.index += 1 55 return True 56 else: 57 return False 58 59 def accept_re(self, regexp): 60 """Same as accept, but with a regexp instead of a single token. 61 Sets self.last_match to the match text upon success""" 62 self.space() 63 re_match = regexp.match(self.tokens, self.index) 64 if re_match: 65 self.index = re_match.end() 66 re_match = re_match.group() 67 self.last_match = re_match 68 return re_match 69 70 def expect(self, token): 71 """Raise an error if the next token doesn't match the provided token""" 72 if not self.accept(token): 73 raise ParseError("'{0}' expected at index {1}, but not found" 74 .format(token, self.index)) 75 76 def expect_re(self, regexp): 77 """Same as expect, but with a regexp instead of a single token""" 78 if self.accept_re(regexp) is None: 79 raise ParseError("RE match expected at index {0}, but not found" 80 .format(self.index)) 81 return self.last_match 82 83 def eof(self): 84 """Return whether last token has been consumed""" 85 return self.index >= len(self.tokens) 86 87 def match_list(self, rule): 88 """Match a comma-separated list of rules""" 89 m = [rule()] 90 while self.accept(','): 91 m.append(rule()) 92 return m 93 94 def StartQuery(self): 95 """Match a query that extends until the end of the input""" 96 s = self.Query(outer=True) 97 if not self.eof(): 98 raise ParseError('Query ended before end of input') 99 return s 100 101 def Query(self, outer=False): 102 """Rule for a query or subquery. Determines type of query based on 103 first token""" 104 self.space() 105 if self.eof(): 106 return match.True_() 107 elif self.accept('!'): 108 return self.Negation(self.Query) 109 elif self.accept('&'): 110 return self.Intersection(self.Query) 111 elif self.accept('|'): 112 return self.Union(self.Query) 113 elif self.accept('#'): 114 return self.Intersection(self.Numcmp) 115 elif self.accept('@'): 116 return self.Extension() 117 try: 118 # Equals, NotEquals and Star can begin the same, 119 # so try in order, backtracking on failure (with Star last) 120 index = self.index 121 return self.Equals() 122 except ParseError: 123 self.index = index 124 try: 125 return self.NotEquals() 126 except ParseError: 127 self.index = index 128 return self.Star(outer=outer) 129 130 def Negation(self, rule): 131 """Rule for '!query'. '!' token is consumed in Query""" 132 return match.Neg(rule()) 133 134 def Intersection(self, rule): 135 """Rule for '&(query, query)'. '&' token is consumed in Query""" 136 self.expect('(') 137 inter = match.Inter(self.match_list(rule)) 138 self.expect(')') 139 return inter 140 141 def Union(self, rule): 142 """Rule for '|(query, query)'. '|' token is consumed in Query""" 143 self.expect('(') 144 union = match.Union(self.match_list(rule)) 145 self.expect(')') 146 return union 147 148 def Numcmp(self): 149 """Rule for numerical comparison like 'length > 3:30'""" 150 cmps = [] 151 expr2 = self.Numexpr(allow_date=True) 152 while self.accept_re(RELATIONAL_OPERATOR): 153 expr = expr2 154 relop = self.last_match 155 expr2 = self.Numexpr(allow_date=True) 156 cmps.append(match.Numcmp(expr, relop, expr2)) 157 if not cmps: 158 raise ParseError('No relational operator in numerical comparison') 159 if len(cmps) > 1: 160 return match.Inter(cmps) 161 else: 162 return cmps[0] 163 164 def Numexpr(self, allow_date=False): 165 """Rule for numerical expression like 'playcount + 4'""" 166 if self.accept('('): 167 expr = match.NumexprGroup(self.Numexpr(allow_date=True)) 168 self.expect(')') 169 elif self.accept_re(UNARY_OPERATOR): 170 expr = match.NumexprUnary(self.last_match, self.Numexpr()) 171 elif allow_date and self.accept_re(DATE): 172 # Parse sequences of numbers that looks like dates as either dates 173 # or numbers 174 try: 175 expr = match.NumexprNumberOrDate(self.last_match) 176 except ValueError: 177 # If the date can't be parsed then backtrack and try again 178 # without allowing dates 179 self.index -= len(self.last_match) 180 expr = self.Numexpr(allow_date=False) 181 elif self.accept_re(DIGITS): 182 number = float(self.last_match) 183 if self.accept(':'): 184 # time like 4:15 185 number2 = float(self.expect_re(DIGITS)) 186 expr = match.NumexprNumber(60 * number + number2) 187 elif self.accept_re(WORD): 188 # Number with units like 7 minutes 189 expr = match.numexprUnit(number, self.last_match) 190 else: 191 expr = match.NumexprNumber(number) 192 else: 193 # Either tag name or special name like "today" 194 expr = match.numexprTagOrSpecial(self.expect_re(TAG).strip()) 195 if self.accept_re(BINARY_OPERATOR): 196 # Try matching a binary operator then the second argument 197 binop = self.last_match 198 expr2 = self.Numexpr() 199 return match.NumexprBinary(binop, expr, expr2) 200 else: 201 return expr 202 203 def Extension(self): 204 """Rule for plugin use like @(plugin: arguments)""" 205 self.expect('(') 206 name = self.expect_re(WORD) 207 if self.accept(':'): 208 body = self.ExtBody() 209 else: 210 body = None 211 self.expect(')') 212 return match.Extension(name, body) 213 214 def ExtBody(self): 215 """Body of plugin expression. Matches balanced parentheses""" 216 depth = 0 217 index = self.index 218 try: 219 while True: 220 current = self.tokens[index] 221 if current == '(': 222 depth += 1 223 elif current == ')': 224 if depth == 0: 225 break 226 depth -= 1 227 elif current == '\\': 228 index += 1 229 index += 1 230 except IndexError: 231 if depth != 0: 232 raise ParseError('Unexpected end of string while parsing ' 233 'extension body') 234 result = self.tokens[self.index:index] 235 self.index = index 236 return result 237 238 def Equals(self): 239 """Rule for 'tag=value' queries""" 240 tags = self.match_list(lambda: self.expect_re(TAG)) 241 tags = [tag.strip() for tag in tags] 242 self.expect('=') 243 value = self.Value() 244 return match.Tag(tags, value) 245 246 def NotEquals(self): 247 """Rule for 'tag!=value' queries""" 248 tags = self.match_list(lambda: self.expect_re(TAG)) 249 tags = [tag.strip() for tag in tags] 250 self.expect('!') 251 self.expect('=') 252 value = self.Value() 253 return match.Neg(match.Tag(tags, value)) 254 255 def Value(self, outer=False): 256 """Rule for value. Either a regexp, quoted string, boolean combination 257 of values, or free text string""" 258 if self.accept('/'): 259 regex = self.expect_re(REGEXP) 260 self.expect('/') 261 return self.RegexpMods(regex) 262 elif self.accept('"'): 263 regex = self.str_to_re(self.expect_re(DOUBLE_STRING)) 264 self.expect('"') 265 return self.RegexpMods(regex) 266 elif self.accept("'"): 267 regex = self.str_to_re(self.expect_re(SINGLE_STRING)) 268 self.expect("'") 269 return self.RegexpMods(regex) 270 elif self.accept('!'): 271 return self.Negation(self.Value) 272 elif self.accept('|'): 273 return self.Union(self.Value) 274 elif self.accept('&'): 275 return self.Intersection(self.Value) 276 else: 277 if outer: 278 # Hack to force plain text parsing for top level free text 279 raise ParseError('Free text not allowed at top level of query') 280 281 return match.Regex(re_escape(self.expect_re(TEXT)), u"d") 282 283 def RegexpMods(self, regex): 284 """Consume regexp modifiers from tokens and compile provided regexp 285 with them. 286 """ 287 288 mod_string = self.expect_re(MODIFIERS) 289 return match.Regex(regex, mod_string) 290 291 def Star(self, outer=False): 292 """Rule for value that matches all visible tags""" 293 return match.Tag(self.star, self.Value(outer=outer)) 294 295 def str_to_re(self, string): 296 """Convert plain string to escaped regexp that can be compiled""" 297 if isinstance(string, str): 298 string = string.encode('utf-8') 299 string = codecs.escape_decode(string)[0] 300 string = string.decode('utf-8') 301 return "^%s$" % re_escape(string) 302