1 #pragma once
2 #include <QByteArray>
3 #include <QEventLoop>
4 #include <QList>
5 #include <QMap>
6 #include <QMutex>
7 #include <QObject>
8 #include <QSharedPointer>
9 #include <QTimer>
10 #include <QVariantList>
11 #include <functional>
12 
13 #include "command.h"
14 #include "connectionconfig.h"
15 #include "exception.h"
16 #include "response.h"
17 #include "scancommand.h"
18 
19 namespace RedisClient {
20 
21 class AbstractTransporter;
22 
23 typedef QMap<int, int> DatabaseList;
24 
25 /**
26  * @brief The ServerInfo struct
27  * Represents redis-server information parsed from INFO command.
28  */
29 struct ServerInfo {
30   ServerInfo();
31 
32   double version;
33   bool clusterMode;
34   bool sentinelMode;
35   DatabaseList databases;
36 
37   class ParsedServerInfo : public QHash<QString, QHash<QString, QString>> {
38    public:
39     QVariantMap toVariantMap();
40   };
41 
42   ParsedServerInfo parsed;
43 
44   static ServerInfo fromString(const QString &info);
45 };
46 
47 /**
48  * @brief The Connection class
49  * Main client class.
50  */
51 class Connection : public QObject {
52   Q_OBJECT
53   ADD_EXCEPTION
54 
55   friend class AbstractTransporter;
56 
57  public:
58   enum class Mode { Normal, PubSub, Cluster, Sentinel };
59   class InvalidModeException : public Connection::Exception {};
60 
61   class SSHSupportException : public Connection::Exception {
62    public:
63     SSHSupportException(const QString &e);
64   };
65 
66  public:
67   /**
68    * @brief Constructs connection class
69    * @param c - connection config
70    * @param autoconnect - Auto connect if disconnected
71    * NOTE: different config options are required for different transporters.
72    */
73   Connection(const ConnectionConfig &c, bool autoConnect = true);
74 
75   /**
76    * @brief ~Connection
77    * If connection established internally call disconnect()
78    */
79   virtual ~Connection();
80 
81   /**
82    * @brief connects to redis-server
83    * @param wait -  true = sync mode, false = async mode
84    * @return true - on success
85    * @throws Connection::Exception if config is invalid or something went wrong.
86    */
87   virtual bool connect(bool wait = true);
88 
89   /**
90    * @brief isConnected
91    * @return
92    */
93   virtual bool isConnected();
94 
95   /**
96    * @brief disconnect from redis-server
97    */
98   virtual void disconnect();
99 
100   /**
101    * @brief disableAutoConnect
102    */
103   virtual void disableAutoConnect();
104 
105   /**
106    * @brief getConfig
107    * @return
108    */
109   ConnectionConfig getConfig() const;
110 
111   /**
112    * @brief setConnectionConfig
113    */
114   void setConnectionConfig(const ConnectionConfig &);
115 
116   /**
117    * @brief Get current mode
118    * @return
119    */
120   Mode mode() const;
121 
122   /**
123    * @brief Get current db index
124    * @return int
125    */
126   int dbIndex() const;
127 
128   /**
129    * @brief Get redis-server version
130    * @return
131    */
132   virtual double getServerVersion();
133 
134   /**
135    * @brief Get keyspace info parsed from INFO command
136    * @return
137    */
138   virtual DatabaseList getKeyspaceInfo();
139 
140   /**
141    * @brief getEnabledModules from INFO command
142    * @return
143    */
144   virtual QHash<QString, QString> getEnabledModules();
145 
146   /**
147    * @brief update internal structure for methods getServerVersion() and
148    * getKeyspaceInfo()
149    */
150   virtual void refreshServerInfo(std::function<void()> callback);
151 
152   /*
153    * Hi-Level Operations API
154    */
155   /**
156    * @brief RawKeysList
157    */
158   typedef QList<QByteArray> RawKeysList;
159   typedef std::function<void(const RawKeysList &, const QString &)>
160       RawKeysListCallback;
161 
162   /**
163    * @brief getDatabaseKeys - async keys loading
164    * @param callback
165    * @param pattern
166    * @param dbIndex
167    */
168   virtual void getDatabaseKeys(RawKeysListCallback callback,
169                                const QString &pattern = QString("*"),
170                                int dbIndex = 0, long scanLimit = 10000);
171 
172   typedef QList<QPair<QByteArray, ulong>> RootNamespaces;
173   typedef QList<QByteArray> RootKeys;
174   typedef QPair<RootNamespaces, RootKeys> NamespaceItems;
175   typedef std::function<void(const NamespaceItems &, const QString &)>
176       NamespaceItemsCallback;
177 
178   virtual void getNamespaceItems(NamespaceItemsCallback callback,
179                                  const QString &nsSeparator,
180                                  const QString &pattern = QString("*"),
181                                  int dbIndex = 0);
182 
183   /**
184    * @brief getClusterKeys - async keys loading from all cluster nodes
185    * @param callback
186    * @param pattern
187    */
188   virtual void getClusterKeys(RawKeysListCallback callback,
189                               const QString &pattern);
190 
191   /**
192    * @brief flushDbKeys - Remove keys on all master nodes
193    */
194   virtual void flushDbKeys(int dbIndex,
195                            std::function<void(const QString &)> callback);
196 
197   typedef QPair<QString, int> Host;
198   typedef QList<Host> HostList;
199 
200   /**
201    * @brief getMasterNodes - Get master nodes of cluster
202    * @return
203    */
204   void getMasterNodes(std::function<void(HostList, const QString& err)> callback);
205 
206   typedef QPair<int, int> Range;
207   typedef QMap<Range, Host> ClusterSlots;
208 
209   /**
210    * @brief getClusterSlots
211    * @return
212    */
213   void getClusterSlots(std::function<void(ClusterSlots, const QString& err)> callback);
214 
215   /**
216    * @brief getClisterHost
217    * @param cmd
218    * @return
219    */
220   Host getClusterHost(const Command &cmd);
221 
222   /**
223    * @brief isCommandSupported
224    * @param rawCmd
225    * @return
226    */
227   virtual QFuture<bool> isCommandSupported(QList<QByteArray> rawCmd);
228 
229   /*
230    * Command execution API
231    */
232   /**
233    * @brief command
234    * @param cmd
235    */
236   QFuture<Response> command(const Command &cmd);
237 
238   /**
239    * @brief Execute command without callback in async mode.
240    * @param rawCmd
241    * @param db
242    */
243   QFuture<Response> command(QList<QByteArray> rawCmd, int db = -1);
244 
245   /**
246    * @brief Execute command with callback in async mode.
247    * @param rawCmd
248    * @param owner
249    * @param callback
250    * @param db
251    */
252   QFuture<Response> command(QList<QByteArray> rawCmd, QObject *owner,
253                             RedisClient::Command::Callback callback,
254                             int db = -1, bool priorityCmd=false);
255 
256   /**
257    * @brief Hi-level wrapper with basic error handling
258    * @param rawCmd
259    * @param owner
260    * @param db
261    * @param callback
262    * @param errback
263    */
264   inline QFuture<Response> cmd(
265       QList<QByteArray> rawCmd, QObject *owner, int db,
266       std::function<void(const RedisClient::Response &)> callback,
267       std::function<void(const QString &)> errback,
268       bool hiPriorityCmd=false) {
269     try {
270       return this->command(
271           rawCmd, owner,
272           [callback, errback](RedisClient::Response r, QString err) {
273             if (err.size() > 0) return errback(err);
274             if (r.isErrorMessage()) return errback(r.value().toString());
275 
276             return callback(r);
277           },
278           db, hiPriorityCmd);
catch(const RedisClient::Connection::Exception & e)279     } catch (const RedisClient::Connection::Exception &e) {
280       errback(QString(e.what()));
281       return QFuture<Response>();
282     }
283   }
284 
285   /**
286    * @brief pipelinedCmd
287    * @param rawCmds
288    * @param owner
289    * @param db
290    * @param callback
291    */
292   void pipelinedCmd(
293       QList<QList<QByteArray>> rawCmds, QObject *owner, int db,
294       std::function<void(const RedisClient::Response &, QString err)> callback);
295 
296   /**
297    * @brief CollectionCallback
298    */
299   typedef std::function<void(QVariant, QString err)> CollectionCallback;
300 
301   /**
302    * @brief IncrementalCollectionCallback
303    */
304   typedef std::function<void(QVariant, QString err, bool final)>
305       IncrementalCollectionCallback;
306 
307   /**
308    * @brief retrieveCollection
309    * @param cmd
310    * @param callback
311    */
312   virtual void retrieveCollection(const ScanCommand &cmd,
313                                   CollectionCallback callback);
314 
315   /**
316    * @brief retrieveCollection
317    * @param cmd
318    * @param callback
319    */
320   virtual void retrieveCollectionIncrementally(
321       const ScanCommand &cmd, IncrementalCollectionCallback callback);
322 
323   /**
324    * @brief runCommand
325    * @param cmd
326    * @return QFuture<Response>
327    */
328   virtual QFuture<Response> runCommand(const Command &cmd);
329 
330   /**
331    * @brief runCommands
332    * @param commands
333    */
334   virtual void runCommands(const QList<Command> &cmd);
335 
336   /**
337    * @brief waitForIdle - Wait until all commands in queue will be processed
338    * @param timeout - in milliseconds
339    */
340   bool waitForIdle(uint timeout);
341 
342   /**
343    * @brief create new connection object with same settings
344    * @return
345    */
346   virtual QSharedPointer<Connection> clone(bool copyServerInfo = true) const;
347 
348   /*
349    * Low level functions for modification
350    * commands execution.
351    */
352   void setTransporter(QSharedPointer<AbstractTransporter>);
353   QSharedPointer<AbstractTransporter> getTransporter() const;
354 
355   void callAfterConnect(std::function<void(const QString &err)> callback);
356 
357  signals:
358   void addCommandsToWorker(const QList<Command> &);
359   void error(const QString &);
360   void log(const QString &);
361   void connected();
362   void shutdownStart();
363   void disconnected();
364   void authOk();
365   void authError(const QString &);
366 
367   // Cluster & Sentinel
368   void reconnectTo(const QString &host, int port);
369 
370  protected:
371   void createTransporter();
372   bool isTransporterRunning();
373 
374   void processScanCommand(
375       const ScanCommand &cmd, CollectionCallback callback,
376       QSharedPointer<QVariantList> result = QSharedPointer<QVariantList>(),
377       bool incrementalProcessing = false);
378 
379   void changeCurrentDbNumber(int db);
380 
381   void clusterConnectToNextMasterNode(
382       std::function<void(const QString &err)> callback);
383 
384   bool hasNotVisitedClusterNodes() const;
385 
386   void sentinelConnectToMaster();
387 
388   void rawClusterSlots(std::function<void(QVariantList, const QString&)> callback);
389 
390  protected slots:
391   void auth();
392 
393  protected:
394   ConnectionConfig m_config;
395   QSharedPointer<QThread> m_transporterThread;
396   QSharedPointer<AbstractTransporter> m_transporter;
397 
398   int m_dbNumber;
399   ServerInfo m_serverInfo;
400   Mode m_currentMode;
401   QMutex m_dbNumberMutex;
402   QMutex m_blockingOp;
403   bool m_autoConnect;
404   bool m_stoppingTransporter;
405   RawKeysListCallback m_collectClusterNodeKeys;
406   RedisClient::Command::Callback m_cmdCallback;
407   QSharedPointer<HostList> m_notVisitedMasterNodes;
408   ClusterSlots m_clusterSlots;
409 };
410 }  // namespace RedisClient
411