1import random
2import textwrap
3import uuid
4from urllib.parse import quote as urlquote
5from urllib.parse import unquote as urlunquote
6
7import pytest
8
9from .. import assert_item_equals
10from .. import EVENT_TEMPLATE
11from .. import normalize_item
12from .. import TASK_TEMPLATE
13from .. import VCARD_TEMPLATE
14from vdirsyncer import exceptions
15from vdirsyncer.storage.base import normalize_meta_value
16from vdirsyncer.vobject import Item
17
18
19def get_server_mixin(server_name):
20    from . import __name__ as base
21
22    x = __import__(f"{base}.servers.{server_name}", fromlist=[""])
23    return x.ServerMixin
24
25
26def format_item(item_template, uid=None):
27    # assert that special chars are handled correctly.
28    r = random.random()
29    return Item(item_template.format(r=r, uid=uid or r))
30
31
32class StorageTests:
33    storage_class = None
34    supports_collections = True
35    supports_metadata = True
36
37    @pytest.fixture(params=["VEVENT", "VTODO", "VCARD"])
38    def item_type(self, request):
39        """Parametrize with all supported item types."""
40        return request.param
41
42    @pytest.fixture
43    def get_storage_args(self):
44        """
45        Return a function with the following properties:
46
47        :param collection: The name of the collection to create and use.
48        """
49        raise NotImplementedError()
50
51    @pytest.fixture
52    def s(self, get_storage_args):
53        return self.storage_class(**get_storage_args())
54
55    @pytest.fixture
56    def get_item(self, item_type):
57        template = {
58            "VEVENT": EVENT_TEMPLATE,
59            "VTODO": TASK_TEMPLATE,
60            "VCARD": VCARD_TEMPLATE,
61        }[item_type]
62
63        return lambda **kw: format_item(template, **kw)
64
65    @pytest.fixture
66    def requires_collections(self):
67        if not self.supports_collections:
68            pytest.skip("This storage does not support collections.")
69
70    @pytest.fixture
71    def requires_metadata(self):
72        if not self.supports_metadata:
73            pytest.skip("This storage does not support metadata.")
74
75    def test_generic(self, s, get_item):
76        items = [get_item() for i in range(1, 10)]
77        hrefs = []
78        for item in items:
79            href, etag = s.upload(item)
80            if etag is None:
81                _, etag = s.get(href)
82            hrefs.append((href, etag))
83        hrefs.sort()
84        assert hrefs == sorted(s.list())
85        for href, etag in hrefs:
86            assert isinstance(href, (str, bytes))
87            assert isinstance(etag, (str, bytes))
88            assert s.has(href)
89            item, etag2 = s.get(href)
90            assert etag == etag2
91
92    def test_empty_get_multi(self, s):
93        assert list(s.get_multi([])) == []
94
95    def test_get_multi_duplicates(self, s, get_item):
96        href, etag = s.upload(get_item())
97        if etag is None:
98            _, etag = s.get(href)
99        ((href2, item, etag2),) = s.get_multi([href] * 2)
100        assert href2 == href
101        assert etag2 == etag
102
103    def test_upload_already_existing(self, s, get_item):
104        item = get_item()
105        s.upload(item)
106        with pytest.raises(exceptions.PreconditionFailed):
107            s.upload(item)
108
109    def test_upload(self, s, get_item):
110        item = get_item()
111        href, etag = s.upload(item)
112        assert_item_equals(s.get(href)[0], item)
113
114    def test_update(self, s, get_item):
115        item = get_item()
116        href, etag = s.upload(item)
117        if etag is None:
118            _, etag = s.get(href)
119        assert_item_equals(s.get(href)[0], item)
120
121        new_item = get_item(uid=item.uid)
122        new_etag = s.update(href, new_item, etag)
123        if new_etag is None:
124            _, new_etag = s.get(href)
125        # See https://github.com/pimutils/vdirsyncer/issues/48
126        assert isinstance(new_etag, (bytes, str))
127        assert_item_equals(s.get(href)[0], new_item)
128
129    def test_update_nonexisting(self, s, get_item):
130        item = get_item()
131        with pytest.raises(exceptions.PreconditionFailed):
132            s.update("huehue", item, '"123"')
133
134    def test_wrong_etag(self, s, get_item):
135        item = get_item()
136        href, etag = s.upload(item)
137        with pytest.raises(exceptions.PreconditionFailed):
138            s.update(href, item, '"lolnope"')
139        with pytest.raises(exceptions.PreconditionFailed):
140            s.delete(href, '"lolnope"')
141
142    def test_delete(self, s, get_item):
143        href, etag = s.upload(get_item())
144        s.delete(href, etag)
145        assert not list(s.list())
146
147    def test_delete_nonexisting(self, s, get_item):
148        with pytest.raises(exceptions.PreconditionFailed):
149            s.delete("1", '"123"')
150
151    def test_list(self, s, get_item):
152        assert not list(s.list())
153        href, etag = s.upload(get_item())
154        if etag is None:
155            _, etag = s.get(href)
156        assert list(s.list()) == [(href, etag)]
157
158    def test_has(self, s, get_item):
159        assert not s.has("asd")
160        href, etag = s.upload(get_item())
161        assert s.has(href)
162        assert not s.has("asd")
163        s.delete(href, etag)
164        assert not s.has(href)
165
166    def test_update_others_stay_the_same(self, s, get_item):
167        info = {}
168        for _ in range(4):
169            href, etag = s.upload(get_item())
170            if etag is None:
171                _, etag = s.get(href)
172            info[href] = etag
173
174        assert {
175            href: etag
176            for href, item, etag in s.get_multi(href for href, etag in info.items())
177        } == info
178
179    def test_repr(self, s, get_storage_args):
180        assert self.storage_class.__name__ in repr(s)
181        assert s.instance_name is None
182
183    def test_discover(self, requires_collections, get_storage_args, get_item):
184        collections = set()
185        for i in range(1, 5):
186            collection = f"test{i}"
187            s = self.storage_class(**get_storage_args(collection=collection))
188            assert not list(s.list())
189            s.upload(get_item())
190            collections.add(s.collection)
191
192        actual = {
193            c["collection"]
194            for c in self.storage_class.discover(**get_storage_args(collection=None))
195        }
196
197        assert actual >= collections
198
199    def test_create_collection(self, requires_collections, get_storage_args, get_item):
200        if getattr(self, "dav_server", "") in ("icloud", "fastmail", "davical"):
201            pytest.skip("Manual cleanup would be necessary.")
202        if getattr(self, "dav_server", "") == "radicale":
203            pytest.skip("Radicale does not support collection creation")
204
205        args = get_storage_args(collection=None)
206        args["collection"] = "test"
207
208        s = self.storage_class(**self.storage_class.create_collection(**args))
209
210        href = s.upload(get_item())[0]
211        assert href in (href for href, etag in s.list())
212
213    def test_discover_collection_arg(self, requires_collections, get_storage_args):
214        args = get_storage_args(collection="test2")
215        with pytest.raises(TypeError) as excinfo:
216            list(self.storage_class.discover(**args))
217
218        assert "collection argument must not be given" in str(excinfo.value)
219
220    def test_collection_arg(self, get_storage_args):
221        if self.storage_class.storage_name.startswith("etesync"):
222            pytest.skip("etesync uses UUIDs.")
223
224        if self.supports_collections:
225            s = self.storage_class(**get_storage_args(collection="test2"))
226            # Can't do stronger assertion because of radicale, which needs a
227            # fileextension to guess the collection type.
228            assert "test2" in s.collection
229        else:
230            with pytest.raises(ValueError):
231                self.storage_class(collection="ayy", **get_storage_args())
232
233    def test_case_sensitive_uids(self, s, get_item):
234        if s.storage_name == "filesystem":
235            pytest.skip("Behavior depends on the filesystem.")
236
237        uid = str(uuid.uuid4())
238        s.upload(get_item(uid=uid.upper()))
239        s.upload(get_item(uid=uid.lower()))
240        items = [href for href, etag in s.list()]
241        assert len(items) == 2
242        assert len(set(items)) == 2
243
244    def test_specialchars(
245        self, monkeypatch, requires_collections, get_storage_args, get_item
246    ):
247        if getattr(self, "dav_server", "") == "radicale":
248            pytest.skip("Radicale is fundamentally broken.")
249        if getattr(self, "dav_server", "") in ("icloud", "fastmail"):
250            pytest.skip("iCloud and FastMail reject this name.")
251
252        monkeypatch.setattr("vdirsyncer.utils.generate_href", lambda x: x)
253
254        uid = "test @ foo ät bar град сатану"
255        collection = "test @ foo ät bar"
256
257        s = self.storage_class(**get_storage_args(collection=collection))
258        item = get_item(uid=uid)
259
260        href, etag = s.upload(item)
261        item2, etag2 = s.get(href)
262        if etag is not None:
263            assert etag2 == etag
264            assert_item_equals(item2, item)
265
266        ((_, etag3),) = s.list()
267        assert etag2 == etag3
268
269        # etesync uses UUIDs for collection names
270        if self.storage_class.storage_name.startswith("etesync"):
271            return
272
273        assert collection in urlunquote(s.collection)
274        if self.storage_class.storage_name.endswith("dav"):
275            assert urlquote(uid, "/@:") in href
276
277    def test_metadata(self, requires_metadata, s):
278        if not getattr(self, "dav_server", ""):
279            assert not s.get_meta("color")
280            assert not s.get_meta("displayname")
281
282        try:
283            s.set_meta("color", None)
284            assert not s.get_meta("color")
285            s.set_meta("color", "#ff0000")
286            assert s.get_meta("color") == "#ff0000"
287        except exceptions.UnsupportedMetadataError:
288            pass
289
290        for x in ("hello world", "hello wörld"):
291            s.set_meta("displayname", x)
292            rv = s.get_meta("displayname")
293            assert rv == x
294            assert isinstance(rv, str)
295
296    @pytest.mark.parametrize(
297        "value",
298        [
299            None,
300            "",
301            "Hello there!",
302            "Österreich",
303            "中国",
304            "한글",
305            "42a4ec99-b1c2-4859-b142-759112f2ca50",
306            "فلسطين",
307        ],
308    )
309    def test_metadata_normalization(self, requires_metadata, s, value):
310        x = s.get_meta("displayname")
311        assert x == normalize_meta_value(x)
312
313        if not getattr(self, "dav_server", None):
314            # ownCloud replaces "" with "unnamed"
315            s.set_meta("displayname", value)
316            assert s.get_meta("displayname") == normalize_meta_value(value)
317
318    def test_recurring_events(self, s, item_type):
319        if item_type != "VEVENT":
320            pytest.skip("This storage instance doesn't support iCalendar.")
321
322        uid = str(uuid.uuid4())
323        item = Item(
324            textwrap.dedent(
325                """
326        BEGIN:VCALENDAR
327        VERSION:2.0
328        BEGIN:VEVENT
329        DTSTART;TZID=UTC:20140325T084000Z
330        DTEND;TZID=UTC:20140325T101000Z
331        DTSTAMP:20140327T060506Z
332        UID:{uid}
333        RECURRENCE-ID;TZID=UTC:20140325T083000Z
334        CREATED:20131216T033331Z
335        DESCRIPTION:
336        LAST-MODIFIED:20140327T060215Z
337        LOCATION:
338        SEQUENCE:1
339        STATUS:CONFIRMED
340        SUMMARY:test Event
341        TRANSP:OPAQUE
342        END:VEVENT
343        BEGIN:VEVENT
344        DTSTART;TZID=UTC:20140128T083000Z
345        DTEND;TZID=UTC:20140128T100000Z
346        RRULE:FREQ=WEEKLY;UNTIL=20141208T213000Z;BYDAY=TU
347        DTSTAMP:20140327T060506Z
348        UID:{uid}
349        CREATED:20131216T033331Z
350        DESCRIPTION:
351        LAST-MODIFIED:20140222T101012Z
352        LOCATION:
353        SEQUENCE:0
354        STATUS:CONFIRMED
355        SUMMARY:Test event
356        TRANSP:OPAQUE
357        END:VEVENT
358        END:VCALENDAR
359        """.format(
360                    uid=uid
361                )
362            ).strip()
363        )
364
365        href, etag = s.upload(item)
366
367        item2, etag2 = s.get(href)
368        assert normalize_item(item) == normalize_item(item2)
369