1#!/usr/bin/python
2
3import StringIO
4import sys
5import unittest
6import vobject
7import vobject.ics_diff
8import webdavlib
9import xml.sax.saxutils
10
11class ics_compare():
12
13    def __init__(self, event1, event2):
14      self.event1 = event1
15      self.event2 = event2
16      self.diffs = None
17
18    def _vcalendarComponent(self, event):
19      event_component = None
20      for item in vobject.readComponents(event):
21        if item.name == "VCALENDAR":
22          event_component = item
23      return event_component
24
25    def areEqual(self):
26        s_event1 = StringIO.StringIO(self.event1)
27        s_event2 = StringIO.StringIO(self.event2)
28
29        event1_vcalendar = self._vcalendarComponent(s_event1)
30        if event1_vcalendar is None:
31            raise Exception("No VCALENDAR component in event1")
32
33        event2_vcalendar = self._vcalendarComponent(s_event2)
34        if event2_vcalendar is None:
35            raise Exception("No VCALENDAR component in event2")
36
37        self.diffs = vobject.ics_diff.diff(event1_vcalendar, event2_vcalendar)
38        if not self.diffs:
39            return True
40        else:
41            return False
42
43    def textDiff(self):
44        saved_stdout = sys.stdout
45        out = StringIO.StringIO()
46        sys.stdout = out
47        try :
48            if self.diffs is not None:
49                for (left, right) in self.diffs:
50                    left.prettyPrint()
51                    right.prettyPrint()
52        finally:
53            sys.stdout = saved_stdout
54
55        return out.getvalue().strip()
56
57
58class TestUtility():
59    def __init__(self, test, client, resource = None):
60        self.test = test
61        self.client = client
62        self.userInfo = {}
63
64    def fetchUserInfo(self, login):
65        if not self.userInfo.has_key(login):
66            resource = "/SOGo/dav/%s/" % login
67            propfind = webdavlib.WebDAVPROPFIND(resource,
68                                                ["displayname",
69                                                 "{urn:ietf:params:xml:ns:caldav}calendar-user-address-set"],
70                                                0)
71            self.client.execute(propfind)
72            self.test.assertEquals(propfind.response["status"], 207)
73            common_tree = "{DAV:}response/{DAV:}propstat/{DAV:}prop"
74            name_nodes = propfind.response["document"] \
75                         .findall('%s/{DAV:}displayname' % common_tree)
76            email_nodes = propfind.response["document"] \
77                          .findall('%s/{urn:ietf:params:xml:ns:caldav}calendar-user-address-set/{DAV:}href'
78                                   % common_tree)
79
80            if len(name_nodes[0].text) > 0:
81                displayName = name_nodes[0].text
82            else:
83                displayName = ""
84            self.userInfo[login] = (displayName, email_nodes[0].text)
85
86        return self.userInfo[login]
87
88class TestACLUtility(TestUtility):
89    def __init__(self, test, client, resource):
90        TestUtility.__init__(self, test, client, resource)
91        self.resource = resource
92
93    def _subscriptionOperation(self, subscribers, operation):
94        subscribeQuery = ("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
95                          + "<%s" % operation
96                          + " xmlns=\"urn:inverse:params:xml:ns:inverse-dav\"")
97        if (subscribers is not None):
98            subscribeQuery = (subscribeQuery
99                              + " users=\"%s\"" % ",".join(subscribers))
100        subscribeQuery = subscribeQuery + "/>"
101        post = webdavlib.HTTPPOST(self.resource, subscribeQuery)
102        post.content_type = "application/xml; charset=\"utf-8\""
103        self.client.execute(post)
104        self.test.assertEquals(post.response["status"], 200,
105                               "subscribtion failure to '%s' for '%s' (status: %d)"
106                               % (self.resource, "', '".join(subscribers),
107                                  post.response["status"]))
108
109    def subscribe(self, subscribers=None):
110        self._subscriptionOperation(subscribers, "subscribe")
111
112    def unsubscribe(self, subscribers=None):
113        self._subscriptionOperation(subscribers, "unsubscribe")
114
115    def rightsToSOGoRights(self, rights):
116        self.fail("subclass must implement this method")
117
118    def setupRights(self, username, rights = None):
119        if rights is not None:
120            rights_str = "".join(["<%s/>"
121                                  % x for x in self.rightsToSOGoRights(rights) ])
122            aclQuery = ("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
123                        + "<acl-query"
124                        + " xmlns=\"urn:inverse:params:xml:ns:inverse-dav\">"
125                        + "<set-roles user=\"%s\">%s</set-roles>" % (xml.sax.saxutils.escape(username),
126                                                                     rights_str)
127                        + "</acl-query>")
128        else:
129            aclQuery = ("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
130                        + "<acl-query"
131                        + " xmlns=\"urn:inverse:params:xml:ns:inverse-dav\">"
132                        + "<remove-user user=\"%s\"/>" % xml.sax.saxutils.escape(username)
133                        + "</acl-query>")
134
135        post = webdavlib.HTTPPOST(self.resource, aclQuery)
136        post.content_type = "application/xml; charset=\"utf-8\""
137        self.client.execute(post)
138
139        if rights is None:
140            err_msg = ("rights modification: failure to remove entry (status: %d)"
141                       % post.response["status"])
142        else:
143            err_msg = ("rights modification: failure to set '%s' (status: %d)"
144                       % (rights_str, post.response["status"]))
145        self.test.assertEquals(post.response["status"], 204, err_msg)
146
147# Calendar:
148#   rights:
149#     v: view all
150#     d: view date and time
151#     m: modify
152#     r: respond
153#   short rights notation: { "c": create,
154#                            "d": delete,
155#                            "pu": public,
156#                            "pr": private,
157#                            "co": confidential }
158class TestCalendarACLUtility(TestACLUtility):
159    def rightsToSOGoRights(self, rights):
160        sogoRights = []
161        if rights.has_key("c") and rights["c"]:
162            sogoRights.append("ObjectCreator")
163        if rights.has_key("d") and rights["d"]:
164            sogoRights.append("ObjectEraser")
165
166        classes = { "pu": "Public",
167                    "pr": "Private",
168                    "co": "Confidential" }
169        rights_table = { "v": "Viewer",
170                         "d": "DAndTViewer",
171                         "m": "Modifier",
172                         "r": "Responder" }
173        for k in classes.keys():
174            if rights.has_key(k):
175                right = rights[k]
176                sogo_right = "%s%s" % (classes[k], rights_table[right])
177                sogoRights.append(sogo_right)
178
179        return sogoRights
180
181# Addressbook:
182#   short rights notation: { "c": create,
183#                            "d": delete,
184#                            "e": edit,
185#                            "v": view }
186class TestAddressBookACLUtility(TestACLUtility):
187    def rightsToSOGoRights(self, rights):
188        sogoRightsTable = { "c": "ObjectCreator",
189                            "d": "ObjectEraser",
190                            "v": "ObjectViewer",
191                            "e": "ObjectEditor" }
192
193        sogoRights = []
194        for k in rights.keys():
195            sogoRights.append(sogoRightsTable[k])
196
197        return sogoRights
198
199
200