1"""
2"""
3
4# Created on 2016.07.08
5#
6# Author: Giovanni Cannata
7#
8# Copyright 2016 - 2020 Giovanni Cannata
9#
10# This file is part of ldap3.
11#
12# ldap3 is free software: you can redistribute it and/or modify
13# it under the terms of the GNU Lesser General Public License as published
14# by the Free Software Foundation, either version 3 of the License, or
15# (at your option) any later version.
16#
17# ldap3 is distributed in the hope that it will be useful,
18# but WITHOUT ANY WARRANTY; without even the implied warranty of
19# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20# GNU Lesser General Public License for more details.
21#
22# You should have received a copy of the GNU Lesser General Public License
23# along with ldap3 in the COPYING and COPYING.LESSER files.
24# If not, see <http://www.gnu.org/licenses/>.
25
26try:
27    from queue import Empty
28except ImportError:  # Python 2
29    # noinspection PyUnresolvedReferences
30    from Queue import Empty
31
32from ...core.exceptions import LDAPExtensionError
33from ...protocol.persistentSearch import persistent_search_control
34from ... import SEQUENCE_TYPES
35from ...utils.dn import safe_dn
36
37
38class PersistentSearch(object):
39    def __init__(self,
40                 connection,
41                 search_base,
42                 search_filter,
43                 search_scope,
44                 dereference_aliases,
45                 attributes,
46                 size_limit,
47                 time_limit,
48                 controls,
49                 changes_only,
50                 events_type,
51                 notifications,
52                 streaming,
53                 callback
54                 ):
55        if connection.strategy.sync:
56            raise LDAPExtensionError('Persistent Search needs an asynchronous streaming connection')
57
58        if connection.check_names and search_base:
59            search_base = safe_dn(search_base)
60
61        self.connection = connection
62        self.changes_only = changes_only
63        self.notifications = notifications
64        self.message_id = None
65        self.base = search_base
66        self.filter = search_filter
67        self.scope = search_scope
68        self.dereference_aliases = dereference_aliases
69        self.attributes = attributes
70        self.size_limit = size_limit
71        self.time_limit = time_limit
72        self.connection.strategy.streaming = streaming
73        if callback and callable(callback):
74            self.connection.strategy.callback = callback
75        elif callback:
76            raise LDAPExtensionError('callback is not callable')
77
78        if not isinstance(controls, SEQUENCE_TYPES):
79            self.controls = []
80        else:
81            self.controls = controls
82
83        if events_type and changes_only and notifications:
84            self.controls.append(persistent_search_control(events_type, changes_only, notifications))
85        self.start()
86
87    def start(self):
88        if self.message_id:  # persistent search already started
89            return
90
91        if not self.connection.bound:
92            self.connection.bind()
93
94        with self.connection.strategy.async_lock:
95            self.message_id = self.connection.search(search_base=self.base,
96                                                     search_filter=self.filter,
97                                                     search_scope=self.scope,
98                                                     dereference_aliases=self.dereference_aliases,
99                                                     attributes=self.attributes,
100                                                     size_limit=self.size_limit,
101                                                     time_limit=self.time_limit,
102                                                     controls=self.controls)
103            self.connection.strategy.persistent_search_message_id = self.message_id
104
105    def stop(self, unbind=True):
106        self.connection.abandon(self.message_id)
107        if unbind:
108            self.connection.unbind()
109        if self.message_id in self.connection.strategy._responses:
110            del self.connection.strategy._responses[self.message_id]
111        if hasattr(self.connection.strategy, '_requests') and self.message_id in self.connection.strategy._requests:  # asynchronous strategy has a dict of request that could be returned by get_response()
112            del self.connection.strategy._requests[self.message_id]
113        self.connection.strategy.persistent_search_message_id = None
114        self.message_id = None
115
116    def next(self, block=False, timeout=None):
117        if not self.connection.strategy.streaming and not self.connection.strategy.callback:
118            try:
119                return self.connection.strategy.events.get(block, timeout)
120            except Empty:
121                return None
122
123        raise LDAPExtensionError('Persistent search is not accumulating events in queue')
124
125    def funnel(self, block=False, timeout=None):
126        done = False
127        while not done:
128            try:
129                entry = self.connection.strategy.events.get(block, timeout)
130            except Empty:
131                yield None
132            if entry['type'] == 'searchResEntry':
133                yield entry
134            else:
135                done = True
136
137        yield entry
138