1""" 2""" 3 4# Created on 2016.07.08 5# 6# Author: Giovanni Cannata 7# 8# Copyright 2016 - 2020 Giovanni Cannata 9# 10# This file is part of ldap3. 11# 12# ldap3 is free software: you can redistribute it and/or modify 13# it under the terms of the GNU Lesser General Public License as published 14# by the Free Software Foundation, either version 3 of the License, or 15# (at your option) any later version. 16# 17# ldap3 is distributed in the hope that it will be useful, 18# but WITHOUT ANY WARRANTY; without even the implied warranty of 19# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 20# GNU Lesser General Public License for more details. 21# 22# You should have received a copy of the GNU Lesser General Public License 23# along with ldap3 in the COPYING and COPYING.LESSER files. 24# If not, see <http://www.gnu.org/licenses/>. 25 26try: 27 from queue import Empty 28except ImportError: # Python 2 29 # noinspection PyUnresolvedReferences 30 from Queue import Empty 31 32from ...core.exceptions import LDAPExtensionError 33from ...protocol.persistentSearch import persistent_search_control 34from ... import SEQUENCE_TYPES 35from ...utils.dn import safe_dn 36 37 38class PersistentSearch(object): 39 def __init__(self, 40 connection, 41 search_base, 42 search_filter, 43 search_scope, 44 dereference_aliases, 45 attributes, 46 size_limit, 47 time_limit, 48 controls, 49 changes_only, 50 events_type, 51 notifications, 52 streaming, 53 callback 54 ): 55 if connection.strategy.sync: 56 raise LDAPExtensionError('Persistent Search needs an asynchronous streaming connection') 57 58 if connection.check_names and search_base: 59 search_base = safe_dn(search_base) 60 61 self.connection = connection 62 self.changes_only = changes_only 63 self.notifications = notifications 64 self.message_id = None 65 self.base = search_base 66 self.filter = search_filter 67 self.scope = search_scope 68 self.dereference_aliases = dereference_aliases 69 self.attributes = attributes 70 self.size_limit = size_limit 71 self.time_limit = time_limit 72 self.connection.strategy.streaming = streaming 73 if callback and callable(callback): 74 self.connection.strategy.callback = callback 75 elif callback: 76 raise LDAPExtensionError('callback is not callable') 77 78 if not isinstance(controls, SEQUENCE_TYPES): 79 self.controls = [] 80 else: 81 self.controls = controls 82 83 if events_type and changes_only and notifications: 84 self.controls.append(persistent_search_control(events_type, changes_only, notifications)) 85 self.start() 86 87 def start(self): 88 if self.message_id: # persistent search already started 89 return 90 91 if not self.connection.bound: 92 self.connection.bind() 93 94 with self.connection.strategy.async_lock: 95 self.message_id = self.connection.search(search_base=self.base, 96 search_filter=self.filter, 97 search_scope=self.scope, 98 dereference_aliases=self.dereference_aliases, 99 attributes=self.attributes, 100 size_limit=self.size_limit, 101 time_limit=self.time_limit, 102 controls=self.controls) 103 self.connection.strategy.persistent_search_message_id = self.message_id 104 105 def stop(self, unbind=True): 106 self.connection.abandon(self.message_id) 107 if unbind: 108 self.connection.unbind() 109 if self.message_id in self.connection.strategy._responses: 110 del self.connection.strategy._responses[self.message_id] 111 if hasattr(self.connection.strategy, '_requests') and self.message_id in self.connection.strategy._requests: # asynchronous strategy has a dict of request that could be returned by get_response() 112 del self.connection.strategy._requests[self.message_id] 113 self.connection.strategy.persistent_search_message_id = None 114 self.message_id = None 115 116 def next(self, block=False, timeout=None): 117 if not self.connection.strategy.streaming and not self.connection.strategy.callback: 118 try: 119 return self.connection.strategy.events.get(block, timeout) 120 except Empty: 121 return None 122 123 raise LDAPExtensionError('Persistent search is not accumulating events in queue') 124 125 def funnel(self, block=False, timeout=None): 126 done = False 127 while not done: 128 try: 129 entry = self.connection.strategy.events.get(block, timeout) 130 except Empty: 131 yield None 132 if entry['type'] == 'searchResEntry': 133 yield entry 134 else: 135 done = True 136 137 yield entry 138