1 /*
2     SPDX-FileCopyrightText: 2007 Tobias Koenig <tokoe@kde.org>
3 
4     SPDX-License-Identifier: LGPL-2.0-or-later
5 */
6 
7 #pragma once
8 
9 #include "akonadicore_export.h"
10 #include "changenotificationdependenciesfactory_p.h"
11 #include "collection.h"
12 #include "collectionfetchscope.h"
13 #include "collectionstatisticsjob.h"
14 #include "commandbuffer_p.h"
15 #include "connection_p.h"
16 #include "entitycache_p.h"
17 #include "item.h"
18 #include "itemfetchscope.h"
19 #include "job.h"
20 #include "monitor.h"
21 #include "servermanager.h"
22 #include "tagfetchscope.h"
23 
24 #include "private/protocol_p.h"
25 
26 #include <QObject>
27 #include <QTimer>
28 
29 #include <QMimeDatabase>
30 #include <QMimeType>
31 #include <QPointer>
32 
33 namespace Akonadi
34 {
35 class Monitor;
36 class ChangeNotification;
37 
38 // A helper struct to wrap pointer to member function (which cannot be contained
39 // in a regular pointer)
40 struct SignalId {
41     constexpr SignalId() = default;
42 
43     using Unit = uint;
44     static constexpr int Size = sizeof(&Monitor::itemAdded) / sizeof(Unit);
45     Unit data[sizeof(&Monitor::itemAdded) / sizeof(Unit)] = {0};
46 
47     inline bool operator==(SignalId other) const
48     {
49         for (int i = Size - 1; i >= 0; --i) {
50             if (data[i] != other.data[i]) {
51                 return false;
52             }
53         }
54         return true;
55     }
56 };
57 
qHash(SignalId sig)58 inline uint qHash(SignalId sig)
59 {
60     // The 4 LSBs of the address should be enough to give us a good hash
61     return sig.data[SignalId::Size - 1];
62 }
63 
64 /**
65  * @internal
66  */
67 class AKONADICORE_EXPORT MonitorPrivate
68 {
69 public:
70     enum ListenerAction {
71         AddListener,
72         RemoveListener,
73     };
74 
75     MonitorPrivate(ChangeNotificationDependenciesFactory *dependenciesFactory_, Monitor *parent);
76     virtual ~MonitorPrivate();
77     void init();
78 
79     Monitor *q_ptr;
80     Q_DECLARE_PUBLIC(Monitor)
81     ChangeNotificationDependenciesFactory *dependenciesFactory = nullptr;
82     QPointer<Connection> ntfConnection;
83     Collection::List collections;
84     QSet<QByteArray> resources;
85     QSet<Item::Id> items;
86     QSet<Tag::Id> tags;
87     QSet<Monitor::Type> types;
88     QSet<QString> mimetypes;
89     bool monitorAll;
90     bool exclusive;
91     QList<QByteArray> sessions;
92     ItemFetchScope mItemFetchScope;
93     TagFetchScope mTagFetchScope;
94     CollectionFetchScope mCollectionFetchScope;
95     bool mFetchChangedOnly;
96     Session *session = nullptr;
97     CollectionCache *collectionCache = nullptr;
98     ItemListCache *itemCache = nullptr;
99     TagListCache *tagCache = nullptr;
100     QMimeDatabase mimeDatabase;
101     QHash<SignalId, quint16> listeners;
102 
103     CommandBuffer mCommandBuffer;
104 
105     Protocol::ModifySubscriptionCommand::ModifiedParts pendingModificationChanges;
106     Protocol::ModifySubscriptionCommand pendingModification;
107     QTimer *pendingModificationTimer = nullptr;
108     bool monitorReady = false;
109 
110     // The waiting list
111     QQueue<Protocol::ChangeNotificationPtr> pendingNotifications;
112     // The messages for which data is currently being fetched
113     QQueue<Protocol::ChangeNotificationPtr> pipeline;
114     // In a pure Monitor, the pipeline contains items that were dequeued from pendingNotifications.
115     // The ordering [pipeline] [pendingNotifications] is kept at all times.
116     // [] [A B C]  -> [A B] [C]  -> [B] [C] -> [B C] [] -> [C] [] -> []
117     // In a ChangeRecorder, the pipeline contains one item only, and not dequeued yet.
118     // [] [A B C] -> [A] [A B C] -> [] [A B C] -> (changeProcessed) [] [B C] -> [B] [B C] etc...
119 
120     bool fetchCollection;
121     bool fetchCollectionStatistics;
122     bool collectionMoveTranslationEnabled;
123 
124     // Virtual methods for ChangeRecorder
notificationsEnqueued(int)125     virtual void notificationsEnqueued(int)
126     {
127     }
notificationsErased()128     virtual void notificationsErased()
129     {
130     }
131 
132     // Virtual so it can be overridden in FakeMonitor.
133     virtual bool connectToNotificationManager();
134     void disconnectFromNotificationManager();
135 
136     void dispatchNotifications();
137     void flushPipeline();
138 
139     bool ensureDataAvailable(const Protocol::ChangeNotificationPtr &msg);
140     /**
141      * Sends out the change notification @p msg.
142      * @param msg the change notification to send
143      * @return @c true if the notification was actually send to someone, @c false if no one was listening.
144      */
145     virtual bool emitNotification(const Protocol::ChangeNotificationPtr &msg);
146     void updatePendingStatistics(const Protocol::ChangeNotificationPtr &msg);
147     void invalidateCaches(const Protocol::ChangeNotificationPtr &msg);
148 
149     /** Used by ResourceBase to inform us about collection changes before the notifications are emitted,
150         needed to avoid the missing RID race on change replay.
151     */
152     void invalidateCache(const Collection &col);
153 
154     /// Virtual so that ChangeRecorder can set it to 0 and handle the pipeline itself
155     virtual int pipelineSize() const;
156 
157     // private Q_SLOTS
158     void dataAvailable();
159     void slotSessionDestroyed(QObject *object);
160     void slotStatisticsChangedFinished(KJob *job);
161     void slotFlushRecentlyChangedCollections();
162 
163     /**
164       Returns whether a message was appended to @p notificationQueue
165     */
166     int translateAndCompress(QQueue<Protocol::ChangeNotificationPtr> &notificationQueue, const Protocol::ChangeNotificationPtr &msg);
167 
168     void handleCommands();
169 
170     virtual void slotNotify(const Protocol::ChangeNotificationPtr &msg);
171 
172     /**
173      * Sends out a change notification for an item.
174      * @return @c true if the notification was actually send to someone, @c false if no one was listening.
175      */
176     bool emitItemsNotification(const Protocol::ItemChangeNotification &msg,
177                                const Item::List &items = Item::List(),
178                                const Collection &collection = Collection(),
179                                const Collection &collectionDest = Collection());
180     /**
181      * Sends out a change notification for a collection.
182      * @return @c true if the notification was actually send to someone, @c false if no one was listening.
183      */
184     bool emitCollectionNotification(const Protocol::CollectionChangeNotification &msg,
185                                     const Collection &col = Collection(),
186                                     const Collection &par = Collection(),
187                                     const Collection &dest = Collection());
188 
189     bool emitTagNotification(const Protocol::TagChangeNotification &msg, const Tag &tags);
190 
191     bool emitRelationNotification(const Protocol::RelationChangeNotification &msg, const Relation &relation);
192 
193     bool emitSubscriptionChangeNotification(const Protocol::SubscriptionChangeNotification &msg, const NotificationSubscriber &subscriber);
194 
195     bool emitDebugChangeNotification(const Protocol::DebugChangeNotification &msg, const ChangeNotification &ntf);
196 
197     void serverStateChanged(Akonadi::ServerManager::State state);
198 
199     /**
200      * This method is called by the ChangeMediator to enforce an invalidation of the passed collection.
201      */
202     void invalidateCollectionCache(qint64 collectionId);
203 
204     /**
205      * This method is called by the ChangeMediator to enforce an invalidation of the passed item.
206      */
207     void invalidateItemCache(qint64 itemId);
208 
209     /**
210      * This method is called by the ChangeMediator to enforce an invalidation of the passed tag.
211      */
212     void invalidateTagCache(qint64 tagId);
213 
214     void scheduleSubscriptionUpdate();
215     void slotUpdateSubscription();
216 
217     void updateListeners(QMetaMethod signal, ListenerAction action);
218 
updateListener(Signal signal,ListenerAction action)219     template<typename Signal> void updateListener(Signal signal, ListenerAction action)
220     {
221         auto it = listeners.find(signalId(signal));
222         if (action == AddListener) {
223             if (it == listeners.end()) {
224                 it = listeners.insert(signalId(signal), 0);
225             }
226             ++(*it);
227         } else {
228             if (--(*it) == 0) {
229                 listeners.erase(it);
230             }
231         }
232     }
233 
234     static Protocol::ModifySubscriptionCommand::ChangeType monitorTypeToProtocol(Monitor::Type type);
235 
236     /**
237       @brief Class used to determine when to purge items in a Collection
238 
239       The buffer method can be used to buffer a Collection. This may cause another Collection
240       to be purged if it is removed from the buffer.
241 
242       The purge method is used to purge a Collection from the buffer, but not the model.
243       This is used for example, to not buffer Collections anymore if they get referenced,
244       and to ensure that one Collection does not appear twice in the buffer.
245 
246       Check whether a Collection is buffered using the isBuffered method.
247     */
248     class AKONADI_TESTS_EXPORT PurgeBuffer
249     {
250         // Buffer the most recent 10 unreferenced Collections
251         static const int MAXBUFFERSIZE = 10;
252 
253     public:
PurgeBuffer()254         explicit PurgeBuffer()
255         {
256         }
257 
258         /**
259           Adds @p id to the Collections to be buffered
260 
261           @returns The collection id which was removed form the buffer or -1 if none.
262         */
263         Collection::Id buffer(Collection::Id id);
264 
265         /**
266         Removes @p id from the Collections being buffered
267         */
268         void purge(Collection::Id id);
269 
isBuffered(Collection::Id id)270         bool isBuffered(Collection::Id id) const
271         {
272             return m_buffer.contains(id);
273         }
274 
275         static int buffersize();
276 
277     private:
278         QQueue<Collection::Id> m_buffer;
279     } m_buffer;
280 
281     QHash<Collection::Id, int> refCountMap;
282     bool useRefCounting;
283     void ref(Collection::Id id);
284     Collection::Id deref(Collection::Id id);
285 
286     /**
287      * Returns true if the collection is monitored by monitor.
288      *
289      * A collection is always monitored if useRefCounting is false.
290      * If ref counting is used, the collection is only monitored,
291      * if the collection is either in refCountMap or m_buffer.
292      * If ref counting is used and the collection is not in refCountMap or m_buffer,
293      * no updates for the contained items are emitted, because they are lazily ignored.
294      */
295     bool isMonitored(Collection::Id colId) const;
296 
297 private:
298     // collections that need a statistics update
299     QSet<Collection::Id> recentlyChangedCollections;
300     QTimer statisticsCompressionTimer;
301 
302     /**
303       @returns True if @p msg should be ignored. Otherwise appropriate signals are emitted for it.
304     */
305     bool isLazilyIgnored(const Protocol::ChangeNotificationPtr &msg, bool allowModifyFlagsConversion = false) const;
306 
307     /**
308       Sets @p needsSplit to True when @p msg contains more than one item and there's at least one
309       listener that does not support batch operations. Sets @p batchSupported to True when
310       there's at least one listener that supports batch operations.
311     */
312     void checkBatchSupport(const Protocol::ChangeNotificationPtr &msg, bool &needsSplit, bool &batchSupported) const;
313 
314     Protocol::ChangeNotificationList splitMessage(const Protocol::ItemChangeNotification &msg, bool legacy) const;
315 
isCollectionMonitored(Collection::Id collection)316     bool isCollectionMonitored(Collection::Id collection) const
317     {
318         if (collection < 0) {
319             return false;
320         }
321         if (collections.contains(Collection(collection))) {
322             return true;
323         }
324         if (collections.contains(Collection::root())) {
325             return true;
326         }
327         return false;
328     }
329 
isMimeTypeMonitored(const QString & mimetype)330     bool isMimeTypeMonitored(const QString &mimetype) const
331     {
332         if (mimetypes.contains(mimetype)) {
333             return true;
334         }
335 
336         const QMimeType mimeType = mimeDatabase.mimeTypeForName(mimetype);
337         if (!mimeType.isValid()) {
338             return false;
339         }
340 
341         for (const QString &mt : mimetypes) {
342             if (mimeType.inherits(mt)) {
343                 return true;
344             }
345         }
346 
347         return false;
348     }
349 
isMoveDestinationResourceMonitored(const T & msg)350     template<typename T> bool isMoveDestinationResourceMonitored(const T &msg) const
351     {
352         if (msg.operation() != T::Move) {
353             return false;
354         }
355         return resources.contains(msg.destinationResource());
356     }
357 
fetchStatistics(Collection::Id colId)358     void fetchStatistics(Collection::Id colId)
359     {
360         auto job = new CollectionStatisticsJob(Collection(colId), session);
361         QObject::connect(job, &KJob::result, q_ptr, [this](KJob *job) {
362             slotStatisticsChangedFinished(job);
363         });
364     }
365 
366     void notifyCollectionStatisticsWatchers(Collection::Id collection, const QByteArray &resource);
367     bool fetchCollections() const;
368     bool fetchItems() const;
369 
370     // A hack to "cast" pointer to member function to something we can easily
371     // use as a key in the hashtable
signalId(Signal signal)372     template<typename Signal> constexpr SignalId signalId(Signal signal) const
373     {
374         union {
375             Signal in;
376             SignalId out;
377         } h = {signal};
378         return h.out;
379     }
380 
hasListeners(Signal signal)381     template<typename Signal> bool hasListeners(Signal signal) const
382     {
383         auto it = listeners.find(signalId(signal));
384         return it != listeners.end();
385     }
386 
emitToListeners(Signal signal,Args...args)387     template<typename Signal, typename... Args> bool emitToListeners(Signal signal, Args... args)
388     {
389         if (hasListeners(signal)) {
390             Q_EMIT(q_ptr->*signal)(std::forward<Args>(args)...);
391             return true;
392         }
393         return false;
394     }
395 };
396 
397 }
398 
399