1# Copyright 2020 The Matrix.org Foundation C.I.C. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Contains *incomplete* type hints for txredisapi. 16""" 17from typing import Any, List, Optional, Type, Union 18 19from twisted.internet import protocol 20from twisted.internet.defer import Deferred 21 22class RedisProtocol(protocol.Protocol): 23 def publish(self, channel: str, message: bytes): ... 24 def ping(self) -> "Deferred[None]": ... 25 def set( 26 self, 27 key: str, 28 value: Any, 29 expire: Optional[int] = None, 30 pexpire: Optional[int] = None, 31 only_if_not_exists: bool = False, 32 only_if_exists: bool = False, 33 ) -> "Deferred[None]": ... 34 def get(self, key: str) -> "Deferred[Any]": ... 35 36class SubscriberProtocol(RedisProtocol): 37 def __init__(self, *args, **kwargs): ... 38 password: Optional[str] 39 def subscribe(self, channels: Union[str, List[str]]): ... 40 def connectionMade(self): ... 41 def connectionLost(self, reason): ... 42 43def lazyConnection( 44 host: str = ..., 45 port: int = ..., 46 dbid: Optional[int] = ..., 47 reconnect: bool = ..., 48 charset: str = ..., 49 password: Optional[str] = ..., 50 connectTimeout: Optional[int] = ..., 51 replyTimeout: Optional[int] = ..., 52 convertNumbers: bool = ..., 53) -> RedisProtocol: ... 54 55class ConnectionHandler: ... 56 57class RedisFactory(protocol.ReconnectingClientFactory): 58 continueTrying: bool 59 handler: RedisProtocol 60 pool: List[RedisProtocol] 61 replyTimeout: Optional[int] 62 def __init__( 63 self, 64 uuid: str, 65 dbid: Optional[int], 66 poolsize: int, 67 isLazy: bool = False, 68 handler: Type = ConnectionHandler, 69 charset: str = "utf-8", 70 password: Optional[str] = None, 71 replyTimeout: Optional[int] = None, 72 convertNumbers: Optional[int] = True, 73 ): ... 74 def buildProtocol(self, addr) -> RedisProtocol: ... 75 76class SubscriberFactory(RedisFactory): 77 def __init__(self) -> None: ... 78