1# -*- coding: utf-8 -*- 2""" 3ldap0.filters - misc stuff for handling LDAP filter strings (see RFC 4515) 4""" 5 6import time 7from typing import List, Sequence, Union 8 9from .functions import strf_secs 10 11FILTER_SPECIAL_CHARS = set('\\*()') 12 13# map for escaping special chars in LDAP filters expect back-slash 14BASIC_ESCAPE_MAP = { 15 ord('*'): '\\2a', 16 ord('('): '\\28', 17 ord(')'): '\\29', 18 ord('\x00'): '\\00', 19} 20 21 22def escape_bytes(assertion_value: bytes) -> str: 23 """ 24 escape all bytes and return str (unicode) 25 """ 26 return ''.join(['\\%02x' % char_code for char_code in assertion_value]) 27 28 29def escape_str(assertion_value: str) -> str: 30 """ 31 Replace all special characters found in assertion_value 32 by quoted notation. 33 """ 34 assert isinstance(assertion_value, str), TypeError( 35 'Expected assertion_value to be unicode (str), got %r' % (assertion_value,) 36 ) 37 return assertion_value.replace('\\', '\\5c').translate(BASIC_ESCAPE_MAP) 38 39 40def negate_filter(flt: str) -> str: 41 """ 42 Returns the simple negation of flt 43 """ 44 assert isinstance(flt, str), TypeError( 45 'Expected flt to be unicode (str), got %r' % (flt,) 46 ) 47 if flt.startswith('(!') and flt.endswith(')'): 48 return flt[2:-1] 49 return '(!{0})'.format(flt) 50 51 52def map_filter_parts(assertion_type: str, assertion_values: Sequence[str]) -> List[str]: 53 """ 54 returns filter parts all with same asserton type but different values 55 """ 56 return [ 57 '(%s=%s)' % (assertion_type, escape_str(av)) 58 for av in assertion_values 59 ] 60 61 62def compose_filter(operand: str, filter_parts: Sequence[str]): 63 """ 64 returns filter string composed by operand and list of sub-filters 65 """ 66 if not filter_parts: 67 return '' 68 if len(filter_parts) == 1: 69 return filter_parts[0] 70 return '(%s%s)' % ( 71 operand, 72 ''.join(filter_parts), 73 ) 74 75 76def dict_filter( 77 entry, 78 iop: str = '&', 79 oop: str = '&', 80 ) -> str: 81 """ 82 returns a filter compose from an entry dictionary 83 84 iop 85 inner operand used for attribute value list 86 oop 87 outer operand used for all attributes 88 """ 89 filter_parts = [] 90 for attr_type, attr_values in entry.items(): 91 filter_parts.append( 92 compose_filter( 93 iop, 94 map_filter_parts(attr_type, attr_values) 95 ) 96 ) 97 if len(filter_parts) == 1: 98 return filter_parts[0] 99 return compose_filter(oop, filter_parts) 100 101 102def time_span_filter( 103 filterstr: str = '', 104 from_timestamp: Union[int, float] = 0, 105 until_timestamp: Union[int, float, None] = None, 106 delta_attr: str = 'modifyTimestamp', 107 ): 108 """ 109 If last_run_timestr is non-zero filterstr will be extended 110 """ 111 if until_timestamp is None: 112 until_timestamp = time.time() 113 if from_timestamp < 0: 114 from_timestamp = until_timestamp + from_timestamp 115 if from_timestamp > until_timestamp: 116 raise ValueError('from_timestamp %r must not be greater than until_timestamp %r' % ( 117 from_timestamp, until_timestamp 118 )) 119 return ( 120 '(&' 121 '{filterstr}' 122 '({delta_attr}>={from_timestr})' 123 '(!({delta_attr}>={until_timestr}))' 124 ')' 125 ).format( 126 filterstr=filterstr, 127 delta_attr=delta_attr, 128 from_timestr=strf_secs(from_timestamp), 129 until_timestr=strf_secs(until_timestamp), 130 ) 131 # end of time_span_filter() 132