1import random 2import textwrap 3import uuid 4from urllib.parse import quote as urlquote 5from urllib.parse import unquote as urlunquote 6 7import pytest 8 9from .. import assert_item_equals 10from .. import EVENT_TEMPLATE 11from .. import normalize_item 12from .. import TASK_TEMPLATE 13from .. import VCARD_TEMPLATE 14from vdirsyncer import exceptions 15from vdirsyncer.storage.base import normalize_meta_value 16from vdirsyncer.vobject import Item 17 18 19def get_server_mixin(server_name): 20 from . import __name__ as base 21 22 x = __import__(f"{base}.servers.{server_name}", fromlist=[""]) 23 return x.ServerMixin 24 25 26def format_item(item_template, uid=None): 27 # assert that special chars are handled correctly. 28 r = random.random() 29 return Item(item_template.format(r=r, uid=uid or r)) 30 31 32class StorageTests: 33 storage_class = None 34 supports_collections = True 35 supports_metadata = True 36 37 @pytest.fixture(params=["VEVENT", "VTODO", "VCARD"]) 38 def item_type(self, request): 39 """Parametrize with all supported item types.""" 40 return request.param 41 42 @pytest.fixture 43 def get_storage_args(self): 44 """ 45 Return a function with the following properties: 46 47 :param collection: The name of the collection to create and use. 48 """ 49 raise NotImplementedError() 50 51 @pytest.fixture 52 def s(self, get_storage_args): 53 return self.storage_class(**get_storage_args()) 54 55 @pytest.fixture 56 def get_item(self, item_type): 57 template = { 58 "VEVENT": EVENT_TEMPLATE, 59 "VTODO": TASK_TEMPLATE, 60 "VCARD": VCARD_TEMPLATE, 61 }[item_type] 62 63 return lambda **kw: format_item(template, **kw) 64 65 @pytest.fixture 66 def requires_collections(self): 67 if not self.supports_collections: 68 pytest.skip("This storage does not support collections.") 69 70 @pytest.fixture 71 def requires_metadata(self): 72 if not self.supports_metadata: 73 pytest.skip("This storage does not support metadata.") 74 75 def test_generic(self, s, get_item): 76 items = [get_item() for i in range(1, 10)] 77 hrefs = [] 78 for item in items: 79 href, etag = s.upload(item) 80 if etag is None: 81 _, etag = s.get(href) 82 hrefs.append((href, etag)) 83 hrefs.sort() 84 assert hrefs == sorted(s.list()) 85 for href, etag in hrefs: 86 assert isinstance(href, (str, bytes)) 87 assert isinstance(etag, (str, bytes)) 88 assert s.has(href) 89 item, etag2 = s.get(href) 90 assert etag == etag2 91 92 def test_empty_get_multi(self, s): 93 assert list(s.get_multi([])) == [] 94 95 def test_get_multi_duplicates(self, s, get_item): 96 href, etag = s.upload(get_item()) 97 if etag is None: 98 _, etag = s.get(href) 99 ((href2, item, etag2),) = s.get_multi([href] * 2) 100 assert href2 == href 101 assert etag2 == etag 102 103 def test_upload_already_existing(self, s, get_item): 104 item = get_item() 105 s.upload(item) 106 with pytest.raises(exceptions.PreconditionFailed): 107 s.upload(item) 108 109 def test_upload(self, s, get_item): 110 item = get_item() 111 href, etag = s.upload(item) 112 assert_item_equals(s.get(href)[0], item) 113 114 def test_update(self, s, get_item): 115 item = get_item() 116 href, etag = s.upload(item) 117 if etag is None: 118 _, etag = s.get(href) 119 assert_item_equals(s.get(href)[0], item) 120 121 new_item = get_item(uid=item.uid) 122 new_etag = s.update(href, new_item, etag) 123 if new_etag is None: 124 _, new_etag = s.get(href) 125 # See https://github.com/pimutils/vdirsyncer/issues/48 126 assert isinstance(new_etag, (bytes, str)) 127 assert_item_equals(s.get(href)[0], new_item) 128 129 def test_update_nonexisting(self, s, get_item): 130 item = get_item() 131 with pytest.raises(exceptions.PreconditionFailed): 132 s.update("huehue", item, '"123"') 133 134 def test_wrong_etag(self, s, get_item): 135 item = get_item() 136 href, etag = s.upload(item) 137 with pytest.raises(exceptions.PreconditionFailed): 138 s.update(href, item, '"lolnope"') 139 with pytest.raises(exceptions.PreconditionFailed): 140 s.delete(href, '"lolnope"') 141 142 def test_delete(self, s, get_item): 143 href, etag = s.upload(get_item()) 144 s.delete(href, etag) 145 assert not list(s.list()) 146 147 def test_delete_nonexisting(self, s, get_item): 148 with pytest.raises(exceptions.PreconditionFailed): 149 s.delete("1", '"123"') 150 151 def test_list(self, s, get_item): 152 assert not list(s.list()) 153 href, etag = s.upload(get_item()) 154 if etag is None: 155 _, etag = s.get(href) 156 assert list(s.list()) == [(href, etag)] 157 158 def test_has(self, s, get_item): 159 assert not s.has("asd") 160 href, etag = s.upload(get_item()) 161 assert s.has(href) 162 assert not s.has("asd") 163 s.delete(href, etag) 164 assert not s.has(href) 165 166 def test_update_others_stay_the_same(self, s, get_item): 167 info = {} 168 for _ in range(4): 169 href, etag = s.upload(get_item()) 170 if etag is None: 171 _, etag = s.get(href) 172 info[href] = etag 173 174 assert { 175 href: etag 176 for href, item, etag in s.get_multi(href for href, etag in info.items()) 177 } == info 178 179 def test_repr(self, s, get_storage_args): 180 assert self.storage_class.__name__ in repr(s) 181 assert s.instance_name is None 182 183 def test_discover(self, requires_collections, get_storage_args, get_item): 184 collections = set() 185 for i in range(1, 5): 186 collection = f"test{i}" 187 s = self.storage_class(**get_storage_args(collection=collection)) 188 assert not list(s.list()) 189 s.upload(get_item()) 190 collections.add(s.collection) 191 192 actual = { 193 c["collection"] 194 for c in self.storage_class.discover(**get_storage_args(collection=None)) 195 } 196 197 assert actual >= collections 198 199 def test_create_collection(self, requires_collections, get_storage_args, get_item): 200 if getattr(self, "dav_server", "") in ("icloud", "fastmail", "davical"): 201 pytest.skip("Manual cleanup would be necessary.") 202 if getattr(self, "dav_server", "") == "radicale": 203 pytest.skip("Radicale does not support collection creation") 204 205 args = get_storage_args(collection=None) 206 args["collection"] = "test" 207 208 s = self.storage_class(**self.storage_class.create_collection(**args)) 209 210 href = s.upload(get_item())[0] 211 assert href in (href for href, etag in s.list()) 212 213 def test_discover_collection_arg(self, requires_collections, get_storage_args): 214 args = get_storage_args(collection="test2") 215 with pytest.raises(TypeError) as excinfo: 216 list(self.storage_class.discover(**args)) 217 218 assert "collection argument must not be given" in str(excinfo.value) 219 220 def test_collection_arg(self, get_storage_args): 221 if self.storage_class.storage_name.startswith("etesync"): 222 pytest.skip("etesync uses UUIDs.") 223 224 if self.supports_collections: 225 s = self.storage_class(**get_storage_args(collection="test2")) 226 # Can't do stronger assertion because of radicale, which needs a 227 # fileextension to guess the collection type. 228 assert "test2" in s.collection 229 else: 230 with pytest.raises(ValueError): 231 self.storage_class(collection="ayy", **get_storage_args()) 232 233 def test_case_sensitive_uids(self, s, get_item): 234 if s.storage_name == "filesystem": 235 pytest.skip("Behavior depends on the filesystem.") 236 237 uid = str(uuid.uuid4()) 238 s.upload(get_item(uid=uid.upper())) 239 s.upload(get_item(uid=uid.lower())) 240 items = [href for href, etag in s.list()] 241 assert len(items) == 2 242 assert len(set(items)) == 2 243 244 def test_specialchars( 245 self, monkeypatch, requires_collections, get_storage_args, get_item 246 ): 247 if getattr(self, "dav_server", "") == "radicale": 248 pytest.skip("Radicale is fundamentally broken.") 249 if getattr(self, "dav_server", "") in ("icloud", "fastmail"): 250 pytest.skip("iCloud and FastMail reject this name.") 251 252 monkeypatch.setattr("vdirsyncer.utils.generate_href", lambda x: x) 253 254 uid = "test @ foo ät bar град сатану" 255 collection = "test @ foo ät bar" 256 257 s = self.storage_class(**get_storage_args(collection=collection)) 258 item = get_item(uid=uid) 259 260 href, etag = s.upload(item) 261 item2, etag2 = s.get(href) 262 if etag is not None: 263 assert etag2 == etag 264 assert_item_equals(item2, item) 265 266 ((_, etag3),) = s.list() 267 assert etag2 == etag3 268 269 # etesync uses UUIDs for collection names 270 if self.storage_class.storage_name.startswith("etesync"): 271 return 272 273 assert collection in urlunquote(s.collection) 274 if self.storage_class.storage_name.endswith("dav"): 275 assert urlquote(uid, "/@:") in href 276 277 def test_metadata(self, requires_metadata, s): 278 if not getattr(self, "dav_server", ""): 279 assert not s.get_meta("color") 280 assert not s.get_meta("displayname") 281 282 try: 283 s.set_meta("color", None) 284 assert not s.get_meta("color") 285 s.set_meta("color", "#ff0000") 286 assert s.get_meta("color") == "#ff0000" 287 except exceptions.UnsupportedMetadataError: 288 pass 289 290 for x in ("hello world", "hello wörld"): 291 s.set_meta("displayname", x) 292 rv = s.get_meta("displayname") 293 assert rv == x 294 assert isinstance(rv, str) 295 296 @pytest.mark.parametrize( 297 "value", 298 [ 299 None, 300 "", 301 "Hello there!", 302 "Österreich", 303 "中国", 304 "한글", 305 "42a4ec99-b1c2-4859-b142-759112f2ca50", 306 "فلسطين", 307 ], 308 ) 309 def test_metadata_normalization(self, requires_metadata, s, value): 310 x = s.get_meta("displayname") 311 assert x == normalize_meta_value(x) 312 313 if not getattr(self, "dav_server", None): 314 # ownCloud replaces "" with "unnamed" 315 s.set_meta("displayname", value) 316 assert s.get_meta("displayname") == normalize_meta_value(value) 317 318 def test_recurring_events(self, s, item_type): 319 if item_type != "VEVENT": 320 pytest.skip("This storage instance doesn't support iCalendar.") 321 322 uid = str(uuid.uuid4()) 323 item = Item( 324 textwrap.dedent( 325 """ 326 BEGIN:VCALENDAR 327 VERSION:2.0 328 BEGIN:VEVENT 329 DTSTART;TZID=UTC:20140325T084000Z 330 DTEND;TZID=UTC:20140325T101000Z 331 DTSTAMP:20140327T060506Z 332 UID:{uid} 333 RECURRENCE-ID;TZID=UTC:20140325T083000Z 334 CREATED:20131216T033331Z 335 DESCRIPTION: 336 LAST-MODIFIED:20140327T060215Z 337 LOCATION: 338 SEQUENCE:1 339 STATUS:CONFIRMED 340 SUMMARY:test Event 341 TRANSP:OPAQUE 342 END:VEVENT 343 BEGIN:VEVENT 344 DTSTART;TZID=UTC:20140128T083000Z 345 DTEND;TZID=UTC:20140128T100000Z 346 RRULE:FREQ=WEEKLY;UNTIL=20141208T213000Z;BYDAY=TU 347 DTSTAMP:20140327T060506Z 348 UID:{uid} 349 CREATED:20131216T033331Z 350 DESCRIPTION: 351 LAST-MODIFIED:20140222T101012Z 352 LOCATION: 353 SEQUENCE:0 354 STATUS:CONFIRMED 355 SUMMARY:Test event 356 TRANSP:OPAQUE 357 END:VEVENT 358 END:VCALENDAR 359 """.format( 360 uid=uid 361 ) 362 ).strip() 363 ) 364 365 href, etag = s.upload(item) 366 367 item2, etag2 = s.get(href) 368 assert normalize_item(item) == normalize_item(item2) 369