1 #include "stream.h"
2 #include <QJsonDocument>
3
StreamKeyModel(QSharedPointer<RedisClient::Connection> connection,QByteArray fullPath,int dbIndex,long long ttl)4 StreamKeyModel::StreamKeyModel(
5 QSharedPointer<RedisClient::Connection> connection, QByteArray fullPath,
6 int dbIndex, long long ttl)
7 : KeyModel(connection, fullPath, dbIndex, ttl, "XLEN", QByteArray()) {}
8
type()9 QString StreamKeyModel::type() { return "stream"; }
10
getColumnNames()11 QStringList StreamKeyModel::getColumnNames() {
12 return QStringList() << "rowNumber"
13 << "id"
14 << "value";
15 }
16
getRoles()17 QHash<int, QByteArray> StreamKeyModel::getRoles() {
18 QHash<int, QByteArray> roles;
19 roles[Roles::RowNumber] = "rowNumber";
20 roles[Roles::ID] = "id";
21 roles[Roles::Value] = "value";
22 return roles;
23 }
24
getData(int rowIndex,int dataRole)25 QVariant StreamKeyModel::getData(int rowIndex, int dataRole) {
26 if (!isRowLoaded(rowIndex)) return QVariant();
27
28 switch (dataRole) {
29 case Value:
30 return QJsonDocument::fromVariant(m_rowsCache[rowIndex].second)
31 .toJson(QJsonDocument::Compact);
32 case ID:
33 return m_rowsCache[rowIndex].first;
34 case RowNumber:
35 return rowIndex;
36 }
37
38 return QVariant();
39 }
40
addRow(const QVariantMap & row,ValueEditor::Model::Callback c)41 void StreamKeyModel::addRow(const QVariantMap &row,
42 ValueEditor::Model::Callback c) {
43 if (!isRowValid(row)) {
44 c(QCoreApplication::translate("RDM", "Invalid row"));
45 return;
46 }
47
48 QList<QByteArray> cmd = {"XADD", m_keyFullPath, row["id"].toByteArray()};
49
50 QJsonParseError err;
51 QJsonDocument jsonValues =
52 QJsonDocument::fromJson(row["value"].toByteArray(), &err);
53
54 if (err.error != QJsonParseError::NoError || !jsonValues.isObject()) {
55 return c(QCoreApplication::translate("RDM", "Invalid row"));
56 }
57
58 auto valuesObject = jsonValues.object();
59
60 for (auto key : valuesObject.keys()) {
61 cmd.append(key.toUtf8());
62 cmd.append(valuesObject[key].toVariant().toString().toUtf8());
63 }
64
65 executeCmd(cmd, c);
66 }
67
updateRow(int,const QVariantMap &,ValueEditor::Model::Callback)68 void StreamKeyModel::updateRow(int, const QVariantMap &,
69 ValueEditor::Model::Callback) {
70 //NOTE(u_glide): Redis Streams doesn't support editing (yet?)
71 }
72
removeRow(int i,ValueEditor::Model::Callback c)73 void StreamKeyModel::removeRow(int i, ValueEditor::Model::Callback c) {
74 if (!isRowLoaded(i)) return;
75
76 executeCmd({"XDEL", m_keyFullPath, m_rowsCache[i].first}, c);
77 }
78
loadRowsCount(ValueEditor::Model::Callback c)79 void StreamKeyModel::loadRowsCount(ValueEditor::Model::Callback c)
80 {
81 executeCmd(
82 {"XINFO", "STREAM", m_keyFullPath}, c,
83 [this](RedisClient::Response r, Callback c) {
84 auto info = r.value().toList();
85 auto it = info.begin();
86
87 while (it != info.end()) {
88 if (!it->canConvert(QMetaType::QByteArray)) {
89 continue;
90 }
91
92 QByteArray propertyName = it->toByteArray();
93
94 it++;
95
96 if (it == info.end())
97 break;
98
99 if (propertyName == QByteArray("length")) {
100 m_rowCount = it->toLongLong();
101 } else if (propertyName == QByteArray("first-entry") ||
102 propertyName == QByteArray("last-entry")) {
103 auto list = it->toList();
104
105 if (list.size() > 0) {
106 m_filters[QString::fromLatin1(propertyName)] = list[0];
107 }
108 }
109
110 it++;
111 }
112
113 c(QString());
114 },
115 RedisClient::Response::Type::Array);
116 }
117
addLoadedRowsToCache(const QVariantList & rows,QVariant rowStartId)118 int StreamKeyModel::addLoadedRowsToCache(const QVariantList &rows,
119 QVariant rowStartId) {
120 QList<QPair<QByteArray, QVariant>> result;
121
122 for (QVariantList::const_iterator item = rows.begin(); item != rows.end();
123 ++item) {
124 QPair<QByteArray, QVariant> value;
125 auto rowValues = item->toList();
126 value.first = rowValues[0].toByteArray();
127
128 QVariantList valuesList = rowValues[1].toList();
129 QVariantMap mappedVal;
130
131 for (QVariantList::const_iterator valItem = valuesList.begin();
132 valItem != valuesList.end(); ++valItem) {
133 auto valKey = valItem->toByteArray();
134 valItem++;
135
136 // NOTE(u_glide): Temporary workaround for https://bugreports.qt.io/browse/QTBUG-84739
137 mappedVal[valKey] = QString::fromUtf8(valItem->toByteArray());
138 }
139
140 value.second = mappedVal;
141 result.push_back(value);
142 }
143
144 auto rowStart = rowStartId.toLongLong();
145 m_rowsCache.addLoadedRange({rowStart, rowStart + result.size() - 1}, result);
146
147 return result.size();
148 }
149
getRangeCmd(QVariant rowStartId,unsigned long count)150 QList<QByteArray> StreamKeyModel::getRangeCmd(QVariant rowStartId,
151 unsigned long count) {
152 QList<QByteArray> cmd;
153 cmd << "XREVRANGE" << m_keyFullPath;
154
155 if (filter("end").isNull()) { // end
156 cmd << "+";
157 } else {
158 cmd << QString::number(filter("end").toLongLong()).toLatin1();
159 }
160
161 if (filter("start").isNull()) { // start
162 unsigned long rowStart = rowStartId.toULongLong();
163
164 if (isRowLoaded(rowStart - 1)) {
165 cmd << m_rowsCache[rowStart - 1].first;
166 } else {
167 cmd << "-";
168 }
169 } else {
170 cmd << QString::number(filter("start").toLongLong()).toLatin1();
171 }
172
173 return cmd << "COUNT" << QString::number(count).toLatin1();
174 }
175