1# -*- coding: utf-8 -*-
2"""
3ldap0.filters - misc stuff for handling LDAP filter strings (see RFC 4515)
4"""
5
6import time
7from typing import List, Sequence, Union
8
9from .functions import strf_secs
10
11FILTER_SPECIAL_CHARS = set('\\*()')
12
13# map for escaping special chars in LDAP filters expect back-slash
14BASIC_ESCAPE_MAP = {
15    ord('*'): '\\2a',
16    ord('('): '\\28',
17    ord(')'): '\\29',
18    ord('\x00'): '\\00',
19}
20
21
22def escape_bytes(assertion_value: bytes) -> str:
23    """
24    escape all bytes and return str (unicode)
25    """
26    return ''.join(['\\%02x' % char_code for char_code in assertion_value])
27
28
29def escape_str(assertion_value: str) -> str:
30    """
31    Replace all special characters found in assertion_value
32    by quoted notation.
33    """
34    assert isinstance(assertion_value, str), TypeError(
35        'Expected assertion_value to be unicode (str), got %r' % (assertion_value,)
36    )
37    return assertion_value.replace('\\', '\\5c').translate(BASIC_ESCAPE_MAP)
38
39
40def negate_filter(flt: str) -> str:
41    """
42    Returns the simple negation of flt
43    """
44    assert isinstance(flt, str), TypeError(
45        'Expected flt to be unicode (str), got %r' % (flt,)
46    )
47    if flt.startswith('(!') and flt.endswith(')'):
48        return flt[2:-1]
49    return '(!{0})'.format(flt)
50
51
52def map_filter_parts(assertion_type: str, assertion_values: Sequence[str]) -> List[str]:
53    """
54    returns filter parts all with same asserton type but different values
55    """
56    return [
57        '(%s=%s)' % (assertion_type, escape_str(av))
58        for av in assertion_values
59    ]
60
61
62def compose_filter(operand: str, filter_parts: Sequence[str]):
63    """
64    returns filter string composed by operand and list of sub-filters
65    """
66    if not filter_parts:
67        return ''
68    if len(filter_parts) == 1:
69        return filter_parts[0]
70    return '(%s%s)' % (
71        operand,
72        ''.join(filter_parts),
73    )
74
75
76def dict_filter(
77        entry,
78        iop: str = '&',
79        oop: str = '&',
80    ) -> str:
81    """
82    returns a filter compose from an entry dictionary
83
84    iop
85       inner operand used for attribute value list
86    oop
87       outer operand used for all attributes
88    """
89    filter_parts = []
90    for attr_type, attr_values in entry.items():
91        filter_parts.append(
92            compose_filter(
93                iop,
94                map_filter_parts(attr_type, attr_values)
95            )
96        )
97    if len(filter_parts) == 1:
98        return filter_parts[0]
99    return compose_filter(oop, filter_parts)
100
101
102def time_span_filter(
103        filterstr: str = '',
104        from_timestamp: Union[int, float] = 0,
105        until_timestamp: Union[int, float, None] = None,
106        delta_attr: str = 'modifyTimestamp',
107    ):
108    """
109    If last_run_timestr is non-zero filterstr will be extended
110    """
111    if until_timestamp is None:
112        until_timestamp = time.time()
113        if from_timestamp < 0:
114            from_timestamp = until_timestamp + from_timestamp
115    if from_timestamp > until_timestamp:
116        raise ValueError('from_timestamp %r must not be greater than until_timestamp %r' % (
117            from_timestamp, until_timestamp
118        ))
119    return (
120        '(&'
121        '{filterstr}'
122        '({delta_attr}>={from_timestr})'
123        '(!({delta_attr}>={until_timestr}))'
124        ')'
125    ).format(
126        filterstr=filterstr,
127        delta_attr=delta_attr,
128        from_timestr=strf_secs(from_timestamp),
129        until_timestr=strf_secs(until_timestamp),
130    )
131    # end of time_span_filter()
132