1 #pragma once 2 #include <QByteArray> 3 #include <QEventLoop> 4 #include <QList> 5 #include <QMap> 6 #include <QMutex> 7 #include <QObject> 8 #include <QSharedPointer> 9 #include <QTimer> 10 #include <QVariantList> 11 #include <functional> 12 13 #include "command.h" 14 #include "connectionconfig.h" 15 #include "exception.h" 16 #include "response.h" 17 #include "scancommand.h" 18 19 namespace RedisClient { 20 21 class AbstractTransporter; 22 23 typedef QMap<int, int> DatabaseList; 24 25 /** 26 * @brief The ServerInfo struct 27 * Represents redis-server information parsed from INFO command. 28 */ 29 struct ServerInfo { 30 ServerInfo(); 31 32 double version; 33 bool clusterMode; 34 bool sentinelMode; 35 DatabaseList databases; 36 37 class ParsedServerInfo : public QHash<QString, QHash<QString, QString>> { 38 public: 39 QVariantMap toVariantMap(); 40 }; 41 42 ParsedServerInfo parsed; 43 44 static ServerInfo fromString(const QString &info); 45 }; 46 47 /** 48 * @brief The Connection class 49 * Main client class. 50 */ 51 class Connection : public QObject { 52 Q_OBJECT 53 ADD_EXCEPTION 54 55 friend class AbstractTransporter; 56 57 public: 58 enum class Mode { Normal, PubSub, Cluster, Sentinel }; 59 class InvalidModeException : public Connection::Exception {}; 60 61 class SSHSupportException : public Connection::Exception { 62 public: 63 SSHSupportException(const QString &e); 64 }; 65 66 public: 67 /** 68 * @brief Constructs connection class 69 * @param c - connection config 70 * @param autoconnect - Auto connect if disconnected 71 * NOTE: different config options are required for different transporters. 72 */ 73 Connection(const ConnectionConfig &c, bool autoConnect = true); 74 75 /** 76 * @brief ~Connection 77 * If connection established internally call disconnect() 78 */ 79 virtual ~Connection(); 80 81 /** 82 * @brief connects to redis-server 83 * @param wait - true = sync mode, false = async mode 84 * @return true - on success 85 * @throws Connection::Exception if config is invalid or something went wrong. 86 */ 87 virtual bool connect(bool wait = true); 88 89 /** 90 * @brief isConnected 91 * @return 92 */ 93 virtual bool isConnected(); 94 95 /** 96 * @brief disconnect from redis-server 97 */ 98 virtual void disconnect(); 99 100 /** 101 * @brief disableAutoConnect 102 */ 103 virtual void disableAutoConnect(); 104 105 /** 106 * @brief getConfig 107 * @return 108 */ 109 ConnectionConfig getConfig() const; 110 111 /** 112 * @brief setConnectionConfig 113 */ 114 void setConnectionConfig(const ConnectionConfig &); 115 116 /** 117 * @brief Get current mode 118 * @return 119 */ 120 Mode mode() const; 121 122 /** 123 * @brief Get current db index 124 * @return int 125 */ 126 int dbIndex() const; 127 128 /** 129 * @brief Get redis-server version 130 * @return 131 */ 132 virtual double getServerVersion(); 133 134 /** 135 * @brief Get keyspace info parsed from INFO command 136 * @return 137 */ 138 virtual DatabaseList getKeyspaceInfo(); 139 140 /** 141 * @brief getEnabledModules from INFO command 142 * @return 143 */ 144 virtual QHash<QString, QString> getEnabledModules(); 145 146 /** 147 * @brief update internal structure for methods getServerVersion() and 148 * getKeyspaceInfo() 149 */ 150 virtual void refreshServerInfo(std::function<void()> callback); 151 152 /* 153 * Hi-Level Operations API 154 */ 155 /** 156 * @brief RawKeysList 157 */ 158 typedef QList<QByteArray> RawKeysList; 159 typedef std::function<void(const RawKeysList &, const QString &)> 160 RawKeysListCallback; 161 162 /** 163 * @brief getDatabaseKeys - async keys loading 164 * @param callback 165 * @param pattern 166 * @param dbIndex 167 */ 168 virtual void getDatabaseKeys(RawKeysListCallback callback, 169 const QString &pattern = QString("*"), 170 int dbIndex = 0, long scanLimit = 10000); 171 172 typedef QList<QPair<QByteArray, ulong>> RootNamespaces; 173 typedef QList<QByteArray> RootKeys; 174 typedef QPair<RootNamespaces, RootKeys> NamespaceItems; 175 typedef std::function<void(const NamespaceItems &, const QString &)> 176 NamespaceItemsCallback; 177 178 virtual void getNamespaceItems(NamespaceItemsCallback callback, 179 const QString &nsSeparator, 180 const QString &pattern = QString("*"), 181 int dbIndex = 0); 182 183 /** 184 * @brief getClusterKeys - async keys loading from all cluster nodes 185 * @param callback 186 * @param pattern 187 */ 188 virtual void getClusterKeys(RawKeysListCallback callback, 189 const QString &pattern); 190 191 /** 192 * @brief flushDbKeys - Remove keys on all master nodes 193 */ 194 virtual void flushDbKeys(int dbIndex, 195 std::function<void(const QString &)> callback); 196 197 typedef QPair<QString, int> Host; 198 typedef QList<Host> HostList; 199 200 /** 201 * @brief getMasterNodes - Get master nodes of cluster 202 * @return 203 */ 204 void getMasterNodes(std::function<void(HostList, const QString& err)> callback); 205 206 typedef QPair<int, int> Range; 207 typedef QMap<Range, Host> ClusterSlots; 208 209 /** 210 * @brief getClusterSlots 211 * @return 212 */ 213 void getClusterSlots(std::function<void(ClusterSlots, const QString& err)> callback); 214 215 /** 216 * @brief getClisterHost 217 * @param cmd 218 * @return 219 */ 220 Host getClusterHost(const Command &cmd); 221 222 /** 223 * @brief isCommandSupported 224 * @param rawCmd 225 * @return 226 */ 227 virtual QFuture<bool> isCommandSupported(QList<QByteArray> rawCmd); 228 229 /* 230 * Command execution API 231 */ 232 /** 233 * @brief command 234 * @param cmd 235 */ 236 QFuture<Response> command(const Command &cmd); 237 238 /** 239 * @brief Execute command without callback in async mode. 240 * @param rawCmd 241 * @param db 242 */ 243 QFuture<Response> command(QList<QByteArray> rawCmd, int db = -1); 244 245 /** 246 * @brief Execute command with callback in async mode. 247 * @param rawCmd 248 * @param owner 249 * @param callback 250 * @param db 251 */ 252 QFuture<Response> command(QList<QByteArray> rawCmd, QObject *owner, 253 RedisClient::Command::Callback callback, 254 int db = -1, bool priorityCmd=false); 255 256 /** 257 * @brief Hi-level wrapper with basic error handling 258 * @param rawCmd 259 * @param owner 260 * @param db 261 * @param callback 262 * @param errback 263 */ 264 inline QFuture<Response> cmd( 265 QList<QByteArray> rawCmd, QObject *owner, int db, 266 std::function<void(const RedisClient::Response &)> callback, 267 std::function<void(const QString &)> errback, 268 bool hiPriorityCmd=false) { 269 try { 270 return this->command( 271 rawCmd, owner, 272 [callback, errback](RedisClient::Response r, QString err) { 273 if (err.size() > 0) return errback(err); 274 if (r.isErrorMessage()) return errback(r.value().toString()); 275 276 return callback(r); 277 }, 278 db, hiPriorityCmd); catch(const RedisClient::Connection::Exception & e)279 } catch (const RedisClient::Connection::Exception &e) { 280 errback(QString(e.what())); 281 return QFuture<Response>(); 282 } 283 } 284 285 /** 286 * @brief pipelinedCmd 287 * @param rawCmds 288 * @param owner 289 * @param db 290 * @param callback 291 */ 292 void pipelinedCmd( 293 QList<QList<QByteArray>> rawCmds, QObject *owner, int db, 294 std::function<void(const RedisClient::Response &, QString err)> callback); 295 296 /** 297 * @brief CollectionCallback 298 */ 299 typedef std::function<void(QVariant, QString err)> CollectionCallback; 300 301 /** 302 * @brief IncrementalCollectionCallback 303 */ 304 typedef std::function<void(QVariant, QString err, bool final)> 305 IncrementalCollectionCallback; 306 307 /** 308 * @brief retrieveCollection 309 * @param cmd 310 * @param callback 311 */ 312 virtual void retrieveCollection(const ScanCommand &cmd, 313 CollectionCallback callback); 314 315 /** 316 * @brief retrieveCollection 317 * @param cmd 318 * @param callback 319 */ 320 virtual void retrieveCollectionIncrementally( 321 const ScanCommand &cmd, IncrementalCollectionCallback callback); 322 323 /** 324 * @brief runCommand 325 * @param cmd 326 * @return QFuture<Response> 327 */ 328 virtual QFuture<Response> runCommand(const Command &cmd); 329 330 /** 331 * @brief runCommands 332 * @param commands 333 */ 334 virtual void runCommands(const QList<Command> &cmd); 335 336 /** 337 * @brief waitForIdle - Wait until all commands in queue will be processed 338 * @param timeout - in milliseconds 339 */ 340 bool waitForIdle(uint timeout); 341 342 /** 343 * @brief create new connection object with same settings 344 * @return 345 */ 346 virtual QSharedPointer<Connection> clone(bool copyServerInfo = true) const; 347 348 /* 349 * Low level functions for modification 350 * commands execution. 351 */ 352 void setTransporter(QSharedPointer<AbstractTransporter>); 353 QSharedPointer<AbstractTransporter> getTransporter() const; 354 355 void callAfterConnect(std::function<void(const QString &err)> callback); 356 357 signals: 358 void addCommandsToWorker(const QList<Command> &); 359 void error(const QString &); 360 void log(const QString &); 361 void connected(); 362 void shutdownStart(); 363 void disconnected(); 364 void authOk(); 365 void authError(const QString &); 366 367 // Cluster & Sentinel 368 void reconnectTo(const QString &host, int port); 369 370 protected: 371 void createTransporter(); 372 bool isTransporterRunning(); 373 374 void processScanCommand( 375 const ScanCommand &cmd, CollectionCallback callback, 376 QSharedPointer<QVariantList> result = QSharedPointer<QVariantList>(), 377 bool incrementalProcessing = false); 378 379 void changeCurrentDbNumber(int db); 380 381 void clusterConnectToNextMasterNode( 382 std::function<void(const QString &err)> callback); 383 384 bool hasNotVisitedClusterNodes() const; 385 386 void sentinelConnectToMaster(); 387 388 void rawClusterSlots(std::function<void(QVariantList, const QString&)> callback); 389 390 protected slots: 391 void auth(); 392 393 protected: 394 ConnectionConfig m_config; 395 QSharedPointer<QThread> m_transporterThread; 396 QSharedPointer<AbstractTransporter> m_transporter; 397 398 int m_dbNumber; 399 ServerInfo m_serverInfo; 400 Mode m_currentMode; 401 QMutex m_dbNumberMutex; 402 QMutex m_blockingOp; 403 bool m_autoConnect; 404 bool m_stoppingTransporter; 405 RawKeysListCallback m_collectClusterNodeKeys; 406 RedisClient::Command::Callback m_cmdCallback; 407 QSharedPointer<HostList> m_notVisitedMasterNodes; 408 ClusterSlots m_clusterSlots; 409 }; 410 } // namespace RedisClient 411