1import contextlib
2import ctypes
3import struct
4from ctypes import c_void_p, c_uint16, c_uint32, c_int32, c_char_p, POINTER
5
6from keyring.py27compat import string_types, add_metaclass
7
8__metaclass__ = type
9
10
11sec_keychain_ref = sec_keychain_item_ref = c_void_p
12OS_status = c_int32
13
14
15class error:
16    item_not_found = -25300
17    keychain_denied = -128
18    sec_auth_failed = -25293
19    plist_missing = -67030
20
21
22fw = '/System/Library/Frameworks/{name}.framework/Versions/A/{name}'.format
23_sec = ctypes.CDLL(fw(name='Security'))
24_core = ctypes.CDLL(fw(name='CoreServices'))
25
26
27SecKeychainOpen = _sec.SecKeychainOpen
28SecKeychainOpen.argtypes = (
29    c_char_p,
30    POINTER(sec_keychain_ref),
31)
32SecKeychainOpen.restype = OS_status
33
34
35SecKeychainCopyDefault = _sec.SecKeychainCopyDefault
36SecKeychainCopyDefault.argtypes = POINTER(sec_keychain_ref),
37SecKeychainCopyDefault.restype = OS_status
38
39
40class Error(Exception):
41    @classmethod
42    def raise_for_status(cls, status):
43        if status == 0:
44            return
45        if status == error.item_not_found:
46            raise NotFound(status, "Item not found")
47        if status == error.keychain_denied:
48            raise KeychainDenied(status, "Keychain Access Denied")
49        if status == error.sec_auth_failed or status == error.plist_missing:
50            raise SecAuthFailure(status, "Security Auth Failure: make sure "
51                                         "python is signed with codesign util")
52        raise cls(status, "Unknown Error")
53
54
55class NotFound(Error):
56    pass
57
58
59class KeychainDenied(Error):
60    pass
61
62
63class SecAuthFailure(Error):
64    pass
65
66
67@contextlib.contextmanager
68def open(name):
69    ref = sec_keychain_ref()
70    if name is None:
71        status = SecKeychainCopyDefault(ref)
72    else:
73        status = SecKeychainOpen(name.encode('utf-8'), ref)
74    Error.raise_for_status(status)
75    try:
76        yield ref
77    finally:
78        _core.CFRelease(ref)
79
80
81SecKeychainFindGenericPassword = _sec.SecKeychainFindGenericPassword
82SecKeychainFindGenericPassword.argtypes = (
83    sec_keychain_ref,
84    c_uint32,
85    c_char_p,
86    c_uint32,
87    c_char_p,
88    POINTER(c_uint32),  # passwordLength
89    POINTER(c_void_p),  # passwordData
90    POINTER(sec_keychain_item_ref),  # itemRef
91)
92SecKeychainFindGenericPassword.restype = OS_status
93
94
95def find_generic_password(kc_name, service, username):
96    username = username.encode('utf-8')
97    service = service.encode('utf-8')
98    with open(kc_name) as keychain:
99        length = c_uint32()
100        data = c_void_p()
101        status = SecKeychainFindGenericPassword(
102            keychain,
103            len(service),
104            service,
105            len(username),
106            username,
107            length,
108            data,
109            None,
110        )
111
112    Error.raise_for_status(status)
113
114    password = ctypes.create_string_buffer(length.value)
115    ctypes.memmove(password, data.value, length.value)
116    SecKeychainItemFreeContent(None, data)
117    return password.raw.decode('utf-8')
118
119
120SecKeychainFindInternetPassword = _sec.SecKeychainFindInternetPassword
121SecKeychainFindInternetPassword.argtypes = (
122    sec_keychain_ref,  # keychainOrArray
123    c_uint32,  # serverNameLength
124    c_char_p,  # serverName
125    c_uint32,  # securityDomainLength
126    c_char_p,  # securityDomain
127    c_uint32,  # accountNameLength
128    c_char_p,  # accountName
129    c_uint32,  # pathLength
130    c_char_p,  # path
131    c_uint16,  # port
132    c_uint32,  # SecProtocolType protocol,
133    c_uint32,  # SecAuthenticationType authenticationType,
134    POINTER(c_uint32),  # passwordLength
135    POINTER(c_void_p),  # passwordData
136    POINTER(sec_keychain_item_ref),  # itemRef
137)
138SecKeychainFindInternetPassword.restype = OS_status
139
140
141class PackedAttributes(type):
142    """
143    Take the attributes which use magic words
144    to represent enumerated constants and generate
145    the constants.
146    """
147    def __new__(cls, name, bases, dict):
148        dict.update(
149            (key, cls.unpack(val))
150            for key, val in dict.items()
151            if not key.startswith('_')
152        )
153        return super(PackedAttributes, cls).__new__(cls, name, bases, dict)
154
155    @staticmethod
156    def unpack(word):
157        r"""
158        >>> PackedAttributes.unpack(0)
159        0
160        >>> PackedAttributes.unpack('\x00\x00\x00\x01')
161        1
162        >>> PackedAttributes.unpack('abcd')
163        1633837924
164        """
165        if not isinstance(word, string_types):
166            return word
167        val, = struct.unpack('!I', word.encode('ascii'))
168        return val
169
170
171@add_metaclass(PackedAttributes)
172class SecProtocolType:
173    kSecProtocolTypeHTTP = 'http'
174    kSecProtocolTypeHTTPS = 'htps'
175    kSecProtocolTypeFTP = 'ftp '
176
177
178@add_metaclass(PackedAttributes)
179class SecAuthenticationType:
180    """
181    >>> SecAuthenticationType.kSecAuthenticationTypeDefault
182    1684434036
183    """
184    kSecAuthenticationTypeDefault = 'dflt'
185    kSecAuthenticationTypeAny = 0
186
187
188def find_internet_password(kc_name, service, username):
189    username = username.encode('utf-8')
190    domain = None
191    service = service.encode('utf-8')
192    path = None
193    port = 0
194
195    with open(kc_name) as keychain:
196        length = c_uint32()
197        data = c_void_p()
198        status = SecKeychainFindInternetPassword(
199            keychain,
200            len(service), service,
201            0, domain,
202            len(username), username,
203            0, path,
204            port,
205            SecProtocolType.kSecProtocolTypeHTTPS,
206            SecAuthenticationType.kSecAuthenticationTypeAny,
207            length,
208            data,
209            None,
210        )
211
212    Error.raise_for_status(status)
213
214    password = ctypes.create_string_buffer(length.value)
215    ctypes.memmove(password, data.value, length.value)
216    SecKeychainItemFreeContent(None, data)
217    return password.raw.decode('utf-8')
218
219
220SecKeychainAddGenericPassword = _sec.SecKeychainAddGenericPassword
221SecKeychainAddGenericPassword.argtypes = (
222    sec_keychain_ref,
223    c_uint32,
224    c_char_p,
225    c_uint32,
226    c_char_p,
227    c_uint32,
228    c_char_p,
229    POINTER(sec_keychain_item_ref),
230)
231SecKeychainAddGenericPassword.restype = OS_status
232
233
234def set_generic_password(name, service, username, password):
235    username = username.encode('utf-8')
236    service = service.encode('utf-8')
237    password = password.encode('utf-8')
238    with open(name) as keychain:
239        item = sec_keychain_item_ref()
240        status = SecKeychainFindGenericPassword(
241            keychain,
242            len(service), service,
243            len(username), username, None,
244            None, item)
245        if status:
246            if status == error.item_not_found:
247                status = SecKeychainAddGenericPassword(
248                    keychain,
249                    len(service), service,
250                    len(username), username,
251                    len(password), password, None)
252        else:
253            status = SecKeychainItemModifyAttributesAndData(
254                item, None, len(password), password)
255            _core.CFRelease(item)
256
257        Error.raise_for_status(status)
258
259
260SecKeychainAddInternetPassword = _sec.SecKeychainAddInternetPassword
261SecKeychainAddInternetPassword.argtypes = (
262    sec_keychain_ref,  # keychainOrArray
263    c_uint32,  # serverNameLength
264    c_char_p,  # serverName
265    c_uint32,  # securityDomainLength
266    c_char_p,  # securityDomain
267    c_uint32,  # accountNameLength
268    c_char_p,  # accountName
269    c_uint32,  # pathLength
270    c_char_p,  # path
271    c_uint16,  # port
272    c_uint32,  # SecProtocolType protocol,
273    c_uint32,  # SecAuthenticationType authenticationType,
274    c_uint32,  # passwordLength
275    c_void_p,  # passwordData
276    POINTER(sec_keychain_item_ref),  # itemRef
277)
278SecKeychainAddInternetPassword.restype = OS_status
279
280
281def set_internet_password(name, service, username, password):
282    username = username.encode('utf-8')
283    domain = None
284    service = service.encode('utf-8')
285    password = password.encode('utf-8')
286    path = None
287    port = 0
288    with open(name) as keychain:
289        # TODO: Use update or set technique as seen in set_generic_password
290        status = SecKeychainAddInternetPassword(
291            keychain,
292            len(service), service,
293            0, domain,
294            len(username), username,
295            0, path,
296            port,
297            SecProtocolType.kSecProtocolTypeHTTPS,
298            SecAuthenticationType.kSecAuthenticationTypeAny,
299            len(password), password,
300            None,
301        )
302
303        Error.raise_for_status(status)
304
305
306SecKeychainItemModifyAttributesAndData = (
307    _sec.SecKeychainItemModifyAttributesAndData)
308SecKeychainItemModifyAttributesAndData.argtypes = (
309    sec_keychain_item_ref, c_void_p, c_uint32, c_void_p,
310)
311SecKeychainItemModifyAttributesAndData.restype = OS_status
312
313SecKeychainItemFreeContent = _sec.SecKeychainItemFreeContent
314SecKeychainItemFreeContent.argtypes = (
315    c_void_p, c_void_p,
316)
317SecKeychainItemFreeContent.restype = OS_status
318
319SecKeychainItemDelete = _sec.SecKeychainItemDelete
320SecKeychainItemDelete.argtypes = sec_keychain_item_ref,
321SecKeychainItemDelete.restype = OS_status
322
323
324def delete_generic_password(name, service, username):
325    username = username.encode('utf-8')
326    service = service.encode('utf-8')
327    with open(name) as keychain:
328        length = c_uint32()
329        data = c_void_p()
330        item = sec_keychain_item_ref()
331        status = SecKeychainFindGenericPassword(
332            keychain,
333            len(service),
334            service,
335            len(username),
336            username,
337            length,
338            data,
339            item,
340        )
341
342    Error.raise_for_status(status)
343
344    SecKeychainItemDelete(item)
345    _core.CFRelease(item)
346