1 #include "stream.h"
2 #include <QJsonDocument>
3 
StreamKeyModel(QSharedPointer<RedisClient::Connection> connection,QByteArray fullPath,int dbIndex,long long ttl)4 StreamKeyModel::StreamKeyModel(
5     QSharedPointer<RedisClient::Connection> connection, QByteArray fullPath,
6     int dbIndex, long long ttl)
7     : KeyModel(connection, fullPath, dbIndex, ttl, "XLEN", QByteArray()) {}
8 
type()9 QString StreamKeyModel::type() { return "stream"; }
10 
getColumnNames()11 QStringList StreamKeyModel::getColumnNames() {
12   return QStringList() << "rowNumber"
13                        << "id"
14                        << "value";
15 }
16 
getRoles()17 QHash<int, QByteArray> StreamKeyModel::getRoles() {
18   QHash<int, QByteArray> roles;
19   roles[Roles::RowNumber] = "rowNumber";
20   roles[Roles::ID] = "id";
21   roles[Roles::Value] = "value";
22   return roles;
23 }
24 
getData(int rowIndex,int dataRole)25 QVariant StreamKeyModel::getData(int rowIndex, int dataRole) {
26   if (!isRowLoaded(rowIndex)) return QVariant();
27 
28   switch (dataRole) {
29     case Value:
30       return QJsonDocument::fromVariant(m_rowsCache[rowIndex].second)
31           .toJson(QJsonDocument::Compact);
32     case ID:
33       return m_rowsCache[rowIndex].first;
34     case RowNumber:
35       return rowIndex;
36   }
37 
38   return QVariant();
39 }
40 
addRow(const QVariantMap & row,ValueEditor::Model::Callback c)41 void StreamKeyModel::addRow(const QVariantMap &row,
42                             ValueEditor::Model::Callback c) {
43   if (!isRowValid(row)) {
44     c(QCoreApplication::translate("RDM", "Invalid row"));
45     return;
46   }
47 
48   QList<QByteArray> cmd = {"XADD", m_keyFullPath, row["id"].toByteArray()};
49 
50   QJsonParseError err;
51   QJsonDocument jsonValues =
52       QJsonDocument::fromJson(row["value"].toByteArray(), &err);
53 
54   if (err.error != QJsonParseError::NoError || !jsonValues.isObject()) {
55     return c(QCoreApplication::translate("RDM", "Invalid row"));
56   }
57 
58   auto valuesObject = jsonValues.object();
59 
60   for (auto key : valuesObject.keys()) {
61     cmd.append(key.toUtf8());
62     cmd.append(valuesObject[key].toVariant().toString().toUtf8());
63   }
64 
65   executeCmd(cmd, c);
66 }
67 
updateRow(int,const QVariantMap &,ValueEditor::Model::Callback)68 void StreamKeyModel::updateRow(int, const QVariantMap &,
69                                ValueEditor::Model::Callback) {
70     //NOTE(u_glide): Redis Streams doesn't support editing (yet?)
71 }
72 
removeRow(int i,ValueEditor::Model::Callback c)73 void StreamKeyModel::removeRow(int i, ValueEditor::Model::Callback c) {
74   if (!isRowLoaded(i)) return;
75 
76   executeCmd({"XDEL", m_keyFullPath, m_rowsCache[i].first}, c);
77 }
78 
loadRowsCount(ValueEditor::Model::Callback c)79 void StreamKeyModel::loadRowsCount(ValueEditor::Model::Callback c)
80 {
81   executeCmd(
82       {"XINFO", "STREAM", m_keyFullPath}, c,
83       [this](RedisClient::Response r, Callback c) {
84         auto info = r.value().toList();
85         auto it = info.begin();
86 
87         while (it != info.end()) {
88           if (!it->canConvert(QMetaType::QByteArray)) {
89             continue;
90           }
91 
92           QByteArray propertyName = it->toByteArray();
93 
94           it++;
95 
96           if (it == info.end())
97               break;
98 
99           if (propertyName == QByteArray("length")) {
100             m_rowCount = it->toLongLong();
101           } else if (propertyName == QByteArray("first-entry") ||
102                      propertyName == QByteArray("last-entry")) {
103             auto list = it->toList();
104 
105             if (list.size() > 0) {
106               m_filters[QString::fromLatin1(propertyName)] = list[0];
107             }
108           }
109 
110           it++;
111         }
112 
113         c(QString());
114       },
115       RedisClient::Response::Type::Array);
116 }
117 
addLoadedRowsToCache(const QVariantList & rows,QVariant rowStartId)118 int StreamKeyModel::addLoadedRowsToCache(const QVariantList &rows,
119                                          QVariant rowStartId) {
120   QList<QPair<QByteArray, QVariant>> result;
121 
122   for (QVariantList::const_iterator item = rows.begin(); item != rows.end();
123        ++item) {
124     QPair<QByteArray, QVariant> value;
125     auto rowValues = item->toList();
126     value.first = rowValues[0].toByteArray();
127 
128     QVariantList valuesList = rowValues[1].toList();
129     QVariantMap mappedVal;
130 
131     for (QVariantList::const_iterator valItem = valuesList.begin();
132          valItem != valuesList.end(); ++valItem) {
133       auto valKey = valItem->toByteArray();
134       valItem++;
135 
136       // NOTE(u_glide): Temporary workaround for https://bugreports.qt.io/browse/QTBUG-84739
137       mappedVal[valKey] = QString::fromUtf8(valItem->toByteArray());
138     }
139 
140     value.second = mappedVal;
141     result.push_back(value);
142   }
143 
144   auto rowStart = rowStartId.toLongLong();
145   m_rowsCache.addLoadedRange({rowStart, rowStart + result.size() - 1}, result);
146 
147   return result.size();
148 }
149 
getRangeCmd(QVariant rowStartId,unsigned long count)150 QList<QByteArray> StreamKeyModel::getRangeCmd(QVariant rowStartId,
151                                               unsigned long count) {
152   QList<QByteArray> cmd;
153   cmd << "XREVRANGE" << m_keyFullPath;
154 
155   if (filter("end").isNull()) {  // end
156     cmd << "+";
157   } else {
158     cmd << QString::number(filter("end").toLongLong()).toLatin1();
159   }
160 
161   if (filter("start").isNull()) {  // start
162     unsigned long rowStart = rowStartId.toULongLong();
163 
164     if (isRowLoaded(rowStart - 1)) {
165       cmd << m_rowsCache[rowStart - 1].first;
166     } else {
167        cmd << "-";
168     }
169   } else {
170     cmd << QString::number(filter("start").toLongLong()).toLatin1();
171   }
172 
173   return cmd << "COUNT" << QString::number(count).toLatin1();
174 }
175