1#!/usr/bin/env python 2 3# 4# Copyright 2011 Stef Walter 5# 6# This program is free software: you can redistribute it and/or modify 7# it under the terms of the GNU Lesser General Public License as published 8# by the Free Software Foundation; either version 2 of the licence or (at 9# your option) any later version. 10# 11# See the included COPYING file for more information. 12# 13 14import getopt 15import os 16import sys 17import time 18import unittest 19 20import aes 21import dh 22import hkdf 23 24import dbus 25import dbus.service 26import dbus.glib 27import gobject 28 29COLLECTION_PREFIX = "/org/freedesktop/secrets/collection/" 30 31bus_name = 'org.freedesktop.Secret.MockService' 32ready_pipe = -1 33objects = { } 34 35class NotSupported(dbus.exceptions.DBusException): 36 def __init__(self, msg): 37 dbus.exceptions.DBusException.__init__(self, msg, name="org.freedesktop.DBus.Error.NotSupported") 38 39class InvalidArgs(dbus.exceptions.DBusException): 40 def __init__(self, msg): 41 dbus.exceptions.DBusException.__init__(self, msg, name="org.freedesktop.DBus.Error.InvalidArgs") 42 43class IsLocked(dbus.exceptions.DBusException): 44 def __init__(self, msg): 45 dbus.exceptions.DBusException.__init__(self, msg, name="org.freedesktop.Secret.Error.IsLocked") 46 47class NoSuchObject(dbus.exceptions.DBusException): 48 def __init__(self, msg): 49 dbus.exceptions.DBusException.__init__(self, msg, name="org.freedesktop.Secret.Error.NoSuchObject") 50 51 52unique_identifier = 0 53def next_identifier(prefix=''): 54 global unique_identifier 55 unique_identifier += 1 56 return "%s%d" % (prefix, unique_identifier) 57 58def encode_identifier(value): 59 return "".join([(c.isalpha() or c.isdigit()) and c or "_%02x" % ord(c) \ 60 for c in value.encode('utf-8')]) 61 62def hex_encode(string): 63 return "".join([hex(ord(c))[2:].zfill(2) for c in string]) 64 65def alias_path(name): 66 return "/org/freedesktop/secrets/aliases/%s" % name 67 68class PlainAlgorithm(): 69 def negotiate(self, service, sender, param): 70 if type (param) != dbus.String: 71 raise InvalidArgs("invalid argument passed to OpenSession") 72 session = SecretSession(service, sender, self, None) 73 return (dbus.String("", variant_level=1), session) 74 75 def encrypt(self, key, data): 76 return ("", data) 77 78 def decrypt(self, param, data): 79 if params == "": 80 raise InvalidArgs("invalid secret plain parameter") 81 return data 82 83 84class AesAlgorithm(): 85 def negotiate(self, service, sender, param): 86 if type (param) != dbus.ByteArray: 87 raise InvalidArgs("invalid argument passed to OpenSession") 88 privat, publi = dh.generate_pair() 89 peer = dh.bytes_to_number(param) 90 # print "mock publi: ", hex(publi) 91 # print " mock peer: ", hex(peer) 92 ikm = dh.derive_key(privat, peer) 93 # print " mock ikm: ", hex_encode(ikm) 94 key = hkdf.hkdf(ikm, 16) 95 # print " mock key: ", hex_encode(key) 96 session = SecretSession(service, sender, self, key) 97 return (dbus.ByteArray(dh.number_to_bytes(publi), variant_level=1), session) 98 99 def encrypt(self, key, data): 100 key = map(ord, key) 101 data = aes.append_PKCS7_padding(data) 102 keysize = len(key) 103 iv = [ord(i) for i in os.urandom(16)] 104 mode = aes.AESModeOfOperation.modeOfOperation["CBC"] 105 moo = aes.AESModeOfOperation() 106 (mode, length, ciph) = moo.encrypt(data, mode, key, keysize, iv) 107 return ("".join([chr(i) for i in iv]), 108 "".join([chr(i) for i in ciph])) 109 110 def decrypt(self, key, param, data): 111 key = map(ord, key) 112 keysize = len(key) 113 iv = map(ord, param[:16]) 114 data = map(ord, data) 115 moo = aes.AESModeOfOperation() 116 mode = aes.AESModeOfOperation.modeOfOperation["CBC"] 117 decr = moo.decrypt(data, None, mode, key, keysize, iv) 118 return aes.strip_PKCS7_padding(decr) 119 120 121class SecretPrompt(dbus.service.Object): 122 def __init__(self, service, sender, prompt_name=None, delay=0, 123 dismiss=False, action=None): 124 self.sender = sender 125 self.service = service 126 self.delay = 0 127 self.dismiss = False 128 self.result = dbus.String("", variant_level=1) 129 self.action = action 130 self.completed = False 131 if prompt_name: 132 self.path = "/org/freedesktop/secrets/prompts/%s" % prompt_name 133 else: 134 self.path = "/org/freedesktop/secrets/prompts/%s" % next_identifier('p') 135 dbus.service.Object.__init__(self, service.bus_name, self.path) 136 service.add_prompt(self) 137 assert self.path not in objects 138 objects[self.path] = self 139 140 def _complete(self): 141 if self.completed: 142 return 143 self.completed = True 144 self.Completed(self.dismiss, self.result) 145 self.remove_from_connection() 146 147 @dbus.service.method('org.freedesktop.Secret.Prompt') 148 def Prompt(self, window_id): 149 if self.action: 150 self.result = self.action() 151 gobject.timeout_add(self.delay * 1000, self._complete) 152 153 @dbus.service.method('org.freedesktop.Secret.Prompt') 154 def Dismiss(self): 155 self._complete() 156 157 @dbus.service.signal(dbus_interface='org.freedesktop.Secret.Prompt', signature='bv') 158 def Completed(self, dismiss, result): 159 pass 160 161 162class SecretSession(dbus.service.Object): 163 def __init__(self, service, sender, algorithm, key): 164 self.sender = sender 165 self.service = service 166 self.algorithm = algorithm 167 self.key = key 168 self.path = "/org/freedesktop/secrets/sessions/%s" % next_identifier('s') 169 dbus.service.Object.__init__(self, service.bus_name, self.path) 170 service.add_session(self) 171 objects[self.path] = self 172 173 def encode_secret(self, secret, content_type): 174 (params, data) = self.algorithm.encrypt(self.key, secret) 175 # print " mock iv: ", hex_encode(params) 176 # print " mock ciph: ", hex_encode(data) 177 return dbus.Struct((dbus.ObjectPath(self.path), dbus.ByteArray(params), 178 dbus.ByteArray(data), dbus.String(content_type)), 179 signature="oayays") 180 181 def decode_secret(self, value): 182 plain = self.algorithm.decrypt(self.key, value[1], value[2]) 183 return (plain, value[3]) 184 185 @dbus.service.method('org.freedesktop.Secret.Session') 186 def Close(self): 187 self.remove_from_connection() 188 self.service.remove_session(self) 189 190 191class SecretItem(dbus.service.Object): 192 SUPPORTS_MULTIPLE_OBJECT_PATHS = True 193 194 def __init__(self, collection, identifier=None, label="Item", attributes={ }, 195 secret="", confirm=False, content_type="text/plain", 196 type="org.freedesktop.Secret.Generic"): 197 if identifier is None: 198 identifier = next_identifier() 199 identifier = encode_identifier(identifier) 200 self.collection = collection 201 self.identifier = identifier 202 self.label = label or "Unnamed item" 203 self.secret = secret 204 self.type = type 205 self.attributes = attributes 206 self.content_type = content_type 207 self.path = "%s/%s" % (collection.path, identifier) 208 self.confirm = confirm 209 self.created = self.modified = time.time() 210 dbus.service.Object.__init__(self, collection.service.bus_name, self.path) 211 self.collection.add_item(self) 212 objects[self.path] = self 213 214 def add_alias(self, name): 215 path = "%s/%s" % (alias_path(name), self.identifier) 216 objects[path] = self 217 self.add_to_connection(self.connection, path) 218 219 def remove_alias(self, name): 220 path = "%s/%s" % (alias_path(name), self.identifier) 221 del objects[path] 222 self.remove_from_connection(self.connection, path) 223 224 def match_attributes(self, attributes): 225 for (key, value) in attributes.items(): 226 if not self.attributes.get(key) == value: 227 return False 228 return True 229 230 def get_locked(self): 231 return self.collection.locked 232 233 def perform_xlock(self, lock): 234 return self.collection.perform_xlock(lock) 235 236 def perform_delete(self): 237 self.collection.remove_item(self) 238 del objects[self.path] 239 self.remove_from_connection() 240 241 @dbus.service.method('org.freedesktop.Secret.Item', sender_keyword='sender') 242 def GetSecret(self, session_path, sender=None): 243 session = objects.get(session_path, None) 244 if not session or session.sender != sender: 245 raise InvalidArgs("session invalid: %s" % session_path) 246 if self.get_locked(): 247 raise IsLocked("secret is locked: %s" % self.path) 248 return session.encode_secret(self.secret, self.content_type) 249 250 @dbus.service.method('org.freedesktop.Secret.Item', sender_keyword='sender', byte_arrays=True) 251 def SetSecret(self, secret, sender=None): 252 session = objects.get(secret[0], None) 253 if not session or session.sender != sender: 254 raise InvalidArgs("session invalid: %s" % secret[0]) 255 if self.get_locked(): 256 raise IsLocked("secret is locked: %s" % self.path) 257 (self.secret, self.content_type) = session.decode_secret(secret) 258 259 @dbus.service.method('org.freedesktop.Secret.Item', sender_keyword='sender') 260 def Delete(self, sender=None): 261 item = self 262 def prompt_callback(): 263 item.perform_delete() 264 return dbus.String("", variant_level=1) 265 if self.confirm: 266 prompt = SecretPrompt(self.collection.service, sender, 267 dismiss=False, action=prompt_callback) 268 return dbus.ObjectPath(prompt.path) 269 else: 270 self.perform_delete() 271 return dbus.ObjectPath("/") 272 273 @dbus.service.method(dbus.PROPERTIES_IFACE, in_signature='ss', out_signature='v') 274 def Get(self, interface_name, property_name): 275 return self.GetAll(interface_name)[property_name] 276 277 @dbus.service.method(dbus.PROPERTIES_IFACE, in_signature='s', out_signature='a{sv}') 278 def GetAll(self, interface_name): 279 if interface_name == 'org.freedesktop.Secret.Item': 280 return { 281 'Locked': self.get_locked(), 282 'Attributes': dbus.Dictionary(self.attributes, signature='ss', variant_level=1), 283 'Label': self.label, 284 'Created': dbus.UInt64(self.created), 285 'Modified': dbus.UInt64(self.modified), 286 'Type': self.type 287 } 288 else: 289 raise InvalidArgs('Unknown %s interface' % interface_name) 290 291 @dbus.service.method(dbus.PROPERTIES_IFACE, in_signature='ssv') 292 def Set(self, interface_name, property_name, new_value): 293 if interface_name != 'org.freedesktop.Secret.Item': 294 raise InvalidArgs('Unknown %s interface' % interface_name) 295 if property_name == "Label": 296 self.label = str(new_value) 297 elif property_name == "Attributes": 298 self.attributes = dict(new_value) 299 elif property_name == "Type": 300 self.type = str(new_value) 301 else: 302 raise InvalidArgs('Not writable %s property' % property_name) 303 self.PropertiesChanged(interface_name, { property_name: new_value }, []) 304 305 @dbus.service.signal(dbus.PROPERTIES_IFACE, signature='sa{sv}as') 306 def PropertiesChanged(self, interface_name, changed_properties, invalidated_properties): 307 self.modified = time.time() 308 309 310class SecretCollection(dbus.service.Object): 311 SUPPORTS_MULTIPLE_OBJECT_PATHS = True 312 313 def __init__(self, service, identifier=None, label="Collection", locked=False, 314 confirm=False, master=None): 315 if identifier is None: 316 identifier = label 317 identifier = encode_identifier(identifier) 318 self.service = service 319 self.identifier = identifier 320 self.label = label or "Unnamed collection" 321 self.locked = locked 322 self.items = { } 323 self.confirm = confirm 324 self.master = None 325 self.created = self.modified = time.time() 326 self.aliased = set() 327 self.path = "%s%s" % (COLLECTION_PREFIX, identifier) 328 dbus.service.Object.__init__(self, service.bus_name, self.path) 329 self.service.add_collection(self) 330 objects[self.path] = self 331 332 def add_item(self, item): 333 self.items[item.path] = item 334 for alias in self.aliased: 335 item.add_alias(alias) 336 337 def remove_item(self, item): 338 for alias in self.aliased: 339 item.remove_alias(alias) 340 del self.items[item.path] 341 342 def add_alias(self, name): 343 if name in self.aliased: 344 return 345 self.aliased.add(name) 346 for item in self.items.values(): 347 item.add_alias(name) 348 path = alias_path(name) 349 objects[path] = self 350 self.add_to_connection(self.connection, path) 351 352 def remove_alias(self, name): 353 if name not in self.aliased: 354 return 355 path = alias_path(name) 356 self.aliased.remove(name) 357 del objects[path] 358 self.remove_from_connection(self.connection, path) 359 for item in self.items.values(): 360 item.remove_alias(name) 361 362 def search_items(self, attributes): 363 results = [] 364 for item in self.items.values(): 365 if item.match_attributes(attributes): 366 results.append(item) 367 return results 368 369 def get_locked(self): 370 return self.locked 371 372 def perform_xlock(self, lock): 373 self.locked = lock 374 for item in self.items.values(): 375 self.PropertiesChanged('org.freedesktop.Secret.Item', { "Locked" : lock }, []) 376 self.PropertiesChanged('org.freedesktop.Secret.Collection', { "Locked" : lock }, []) 377 378 def perform_delete(self): 379 for item in self.items.values(): 380 item.perform_delete() 381 del objects[self.path] 382 self.service.remove_collection(self) 383 for alias in list(self.aliased): 384 self.remove_alias(alias) 385 self.remove_from_connection() 386 387 @dbus.service.method('org.freedesktop.Secret.Collection', byte_arrays=True, sender_keyword='sender') 388 def CreateItem(self, properties, value, replace, sender=None): 389 session_path = value[0] 390 session = objects.get(session_path, None) 391 if not session or session.sender != sender: 392 raise InvalidArgs("session invalid: %s" % session_path) 393 394 attributes = properties.get("org.freedesktop.Secret.Item.Attributes", None) 395 label = properties.get("org.freedesktop.Secret.Item.Label", None) 396 type = properties.get("org.freedesktop.Secret.Item.Type", None) 397 (secret, content_type) = session.decode_secret(value) 398 item = None 399 400 if replace: 401 items = self.search_items(attributes) 402 if items: 403 item = items[0] 404 if item is None: 405 item = SecretItem(self, next_identifier(), label, attributes, type=type, 406 secret=secret, confirm=False, content_type=content_type) 407 else: 408 item.label = label 409 item.type = type 410 item.secret = secret 411 item.attributes = attributes 412 item.content_type = content_type 413 return (dbus.ObjectPath(item.path), dbus.ObjectPath("/")) 414 415 @dbus.service.method('org.freedesktop.Secret.Collection', sender_keyword='sender') 416 def Delete(self, sender=None): 417 collection = self 418 def prompt_callback(): 419 collection.perform_delete() 420 return dbus.String("", variant_level=1) 421 if self.confirm: 422 prompt = SecretPrompt(self.service, sender, dismiss=False, 423 action=prompt_callback) 424 return dbus.ObjectPath(prompt.path) 425 else: 426 self.perform_delete() 427 return dbus.ObjectPath("/") 428 429 @dbus.service.method(dbus.PROPERTIES_IFACE, in_signature='ss', out_signature='v') 430 def Get(self, interface_name, property_name): 431 return self.GetAll(interface_name)[property_name] 432 433 @dbus.service.method(dbus.PROPERTIES_IFACE, in_signature='s', out_signature='a{sv}') 434 def GetAll(self, interface_name): 435 if interface_name == 'org.freedesktop.Secret.Collection': 436 return { 437 'Locked': self.get_locked(), 438 'Label': self.label, 439 'Created': dbus.UInt64(self.created), 440 'Modified': dbus.UInt64(self.modified), 441 'Items': dbus.Array([dbus.ObjectPath(i.path) for i in self.items.values()], signature='o', variant_level=1) 442 } 443 else: 444 raise InvalidArgs('Unknown %s interface' % interface_name) 445 446 @dbus.service.method(dbus.PROPERTIES_IFACE, in_signature='ssv') 447 def Set(self, interface_name, property_name, new_value): 448 if interface_name != 'org.freedesktop.Secret.Collection': 449 raise InvalidArgs('Unknown %s interface' % interface_name) 450 if property_name == "Label": 451 self.label = str(new_value) 452 else: 453 raise InvalidArgs('Not a writable property %s' % property_name) 454 self.PropertiesChanged(interface_name, { property_name: new_value }, []) 455 456 @dbus.service.signal(dbus.PROPERTIES_IFACE, signature='sa{sv}as') 457 def PropertiesChanged(self, interface_name, changed_properties, invalidated_properties): 458 self.modified = time.time() 459 460 461class SecretService(dbus.service.Object): 462 463 algorithms = { 464 'plain': PlainAlgorithm(), 465 "dh-ietf1024-sha256-aes128-cbc-pkcs7": AesAlgorithm(), 466 } 467 468 def __init__(self, name=None): 469 if name == None: 470 name = bus_name 471 bus = dbus.SessionBus() 472 self.bus_name = dbus.service.BusName(name, allow_replacement=True, replace_existing=True) 473 dbus.service.Object.__init__(self, self.bus_name, '/org/freedesktop/secrets') 474 self.sessions = { } 475 self.prompts = { } 476 self.collections = { } 477 self.aliases = { } 478 self.aliased = { } 479 480 def on_name_owner_changed(owned, old_owner, new_owner): 481 if not new_owner: 482 for session in list(self.sessions.get(old_owner, [])): 483 session.Close() 484 485 bus.add_signal_receiver(on_name_owner_changed, 486 'NameOwnerChanged', 487 'org.freedesktop.DBus') 488 489 def add_standard_objects(self): 490 collection = SecretCollection(self, "english", label="Collection One", locked=False) 491 SecretItem(collection, "1", label="Item One", 492 attributes={ "number": "1", "string": "one", "even": "false" }, 493 secret="111", type="org.mock.type.Store") 494 SecretItem(collection, "2", attributes={ "number": "2", "string": "two", "even": "true" }, secret="222") 495 SecretItem(collection, "3", attributes={ "number": "3", "string": "three", "even": "false" }, secret="3333") 496 497 self.set_alias('default', collection) 498 499 collection = SecretCollection(self, "spanish", locked=True) 500 SecretItem(collection, "10", attributes={ "number": "1", "string": "uno", "even": "false" }, secret="111") 501 SecretItem(collection, "20", attributes={ "number": "2", "string": "dos", "even": "true" }, secret="222") 502 SecretItem(collection, "30", attributes={ "number": "3", "string": "tres", "even": "false" }, secret="3333") 503 504 collection = SecretCollection(self, "empty", locked=False) 505 collection = SecretCollection(self, "session", label="Session Keyring", locked=False) 506 507 def listen(self): 508 global ready_pipe 509 loop = gobject.MainLoop() 510 if ready_pipe >= 0: 511 os.write(ready_pipe, "GO") 512 os.close(ready_pipe) 513 ready_pipe = -1 514 loop.run() 515 516 def add_session(self, session): 517 if session.sender not in self.sessions: 518 self.sessions[session.sender] = [] 519 self.sessions[session.sender].append(session) 520 521 def remove_session(self, session): 522 self.sessions[session.sender].remove(session) 523 524 def add_collection(self, collection): 525 self.collections[collection.path] = collection 526 527 def remove_collection(self, collection): 528 for alias in list(collection.aliased): 529 self.remove_alias(alias) 530 del self.collections[collection.path] 531 532 def set_alias(self, name, collection): 533 self.remove_alias(name) 534 collection.add_alias(name) 535 self.aliases[name] = collection 536 537 def remove_alias(self, name): 538 if name in self.aliases: 539 collection = self.aliases[name] 540 collection.remove_alias(name) 541 del self.aliases[name] 542 543 def add_prompt(self, prompt): 544 if prompt.sender not in self.prompts: 545 self.prompts[prompt.sender] = [] 546 self.prompts[prompt.sender].append(prompt) 547 548 def remove_prompt (self, prompt): 549 self.prompts[prompt.sender].remove(prompt) 550 551 def find_item(self, object): 552 parts = object.rsplit("/", 1) 553 if len(parts) == 2 and parts[0] in self.collections: 554 return self.collections[parts[0]].get(parts[1], None) 555 return None 556 557 @dbus.service.method('org.freedesktop.Secret.Service', sender_keyword='sender') 558 def Lock(self, paths, lock=True, sender=None): 559 locked = [] 560 prompts = [] 561 for path in paths: 562 if path not in objects: 563 continue 564 object = objects[path] 565 if object.get_locked() == lock: 566 locked.append(path) 567 elif not object.confirm: 568 object.perform_xlock(lock) 569 locked.append(path) 570 else: 571 prompts.append(object) 572 def prompt_callback(): 573 for object in prompts: 574 object.perform_xlock(lock) 575 return dbus.Array([o.path for o in prompts], signature='o') 576 locked = dbus.Array(locked, signature='o') 577 if prompts: 578 prompt = SecretPrompt(self, sender, dismiss=False, action=prompt_callback) 579 return (locked, dbus.ObjectPath(prompt.path)) 580 else: 581 return (locked, dbus.ObjectPath("/")) 582 583 @dbus.service.method('org.freedesktop.Secret.Service', sender_keyword='sender') 584 def Unlock(self, paths, sender=None): 585 return self.Lock(paths, lock=False, sender=sender) 586 587 @dbus.service.method('org.freedesktop.Secret.Service', byte_arrays=True, sender_keyword='sender') 588 def OpenSession(self, algorithm, param, sender=None): 589 assert type(algorithm) == dbus.String 590 591 if algorithm not in self.algorithms: 592 raise NotSupported("algorithm %s is not supported" % algorithm) 593 594 return self.algorithms[algorithm].negotiate(self, sender, param) 595 596 @dbus.service.method('org.freedesktop.Secret.Service', sender_keyword='sender') 597 def CreateCollection(self, properties, alias, sender=None): 598 label = properties.get("org.freedesktop.Secret.Collection.Label", None) 599 service = self 600 def prompt_callback(): 601 collection = SecretCollection(service, None, label, locked=False, confirm=True) 602 return dbus.ObjectPath(collection.path, variant_level=1) 603 prompt = SecretPrompt(self, sender, dismiss=False, action=prompt_callback) 604 return (dbus.ObjectPath("/"), dbus.ObjectPath(prompt.path)) 605 606 @dbus.service.method('org.freedesktop.Secret.Service') 607 def SearchItems(self, attributes): 608 locked = [ ] 609 unlocked = [ ] 610 items = [ ] 611 for collection in self.collections.values(): 612 items = collection.search_items(attributes) 613 if collection.get_locked(): 614 locked.extend([item.path for item in items]) 615 else: 616 unlocked.extend([item.path for item in items]) 617 return (dbus.Array(unlocked, "o"), dbus.Array(locked, "o")) 618 619 @dbus.service.method('org.freedesktop.Secret.Service', sender_keyword='sender') 620 def GetSecrets(self, item_paths, session_path, sender=None): 621 session = objects.get(session_path, None) 622 if not session or session.sender != sender: 623 raise InvalidArgs("session invalid: %s" % session_path) 624 results = dbus.Dictionary(signature="o(oayays)") 625 for item_path in item_paths: 626 item = objects.get(item_path, None) 627 if item and not item.get_locked(): 628 results[item_path] = item.GetSecret(session_path, sender) 629 return results 630 631 @dbus.service.method('org.freedesktop.Secret.Service') 632 def ReadAlias(self, name): 633 if name not in self.aliases: 634 return dbus.ObjectPath("/") 635 return dbus.ObjectPath(self.aliases[name].path) 636 637 @dbus.service.method('org.freedesktop.Secret.Service') 638 def SetAlias(self, name, collection): 639 if collection not in self.collections: 640 raise NoSuchObject("no such Collection") 641 self.set_alias(name, self.collections[collection]) 642 643 @dbus.service.method(dbus.PROPERTIES_IFACE, in_signature='ss', out_signature='v') 644 def Get(self, interface_name, property_name): 645 return self.GetAll(interface_name)[property_name] 646 647 @dbus.service.method(dbus.PROPERTIES_IFACE, in_signature='s', out_signature='a{sv}') 648 def GetAll(self, interface_name): 649 if interface_name == 'org.freedesktop.Secret.Service': 650 return { 651 'Collections': dbus.Array([dbus.ObjectPath(c.path) for c in self.collections.values()], signature='o', variant_level=1) 652 } 653 else: 654 raise InvalidArgs('Unknown %s interface' % interface_name) 655 656 @dbus.service.method(dbus.PROPERTIES_IFACE, in_signature='ssv') 657 def Set(self, interface_name, property_name, new_value): 658 if interface_name != 'org.freedesktop.Secret.Collection': 659 raise InvalidArgs('Unknown %s interface' % interface_name) 660 raise InvalidArgs('Not a writable property %s' % property_name) 661 662 663def parse_options(args): 664 global bus_name, ready_pipe 665 try: 666 opts, args = getopt.getopt(args, "nr", ["name=", "ready="]) 667 except getopt.GetoptError, err: 668 print str(err) 669 sys.exit(2) 670 for o, a in opts: 671 if o in ("-n", "--name"): 672 bus_name = a 673 elif o in ("-r", "--ready"): 674 ready_pipe = int(a) 675 else: 676 assert False, "unhandled option" 677 return args 678 679parse_options(sys.argv[1:]) 680