1# Copyright (C) 2018 Philipp Hörist <philipp AT hoerist.com> 2# 3# This file is part of nbxmpp. 4# 5# This program is free software; you can redistribute it and/or 6# modify it under the terms of the GNU General Public License 7# as published by the Free Software Foundation; either version 3 8# of the License, or (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program; If not, see <http://www.gnu.org/licenses/>. 17 18from typing import Dict 19 20from dataclasses import dataclass 21 22from nbxmpp.protocol import Iq 23from nbxmpp.simplexml import Node 24from nbxmpp.namespaces import Namespace 25from nbxmpp.structs import StanzaHandler 26from nbxmpp.errors import StanzaError 27from nbxmpp.task import iq_request_task 28from nbxmpp.modules.base import BaseModule 29 30 31class SecurityLabels(BaseModule): 32 def __init__(self, client): 33 BaseModule.__init__(self, client) 34 35 self._client = client 36 self.handlers = [ 37 StanzaHandler(name='message', 38 callback=self._process_message_security_label, 39 ns=Namespace.SECLABEL, 40 priority=15), 41 ] 42 43 def _process_message_security_label(self, _client, stanza, properties): 44 security = stanza.getTag('securitylabel', namespace=Namespace.SECLABEL) 45 if security is None: 46 return 47 48 try: 49 security_label = SecurityLabel.from_node(security) 50 except ValueError as error: 51 self._log.warning(error) 52 return 53 54 properties.security_label = security_label 55 56 @iq_request_task 57 def request_catalog(self, jid): 58 _task = yield 59 60 response = yield _make_catalog_request(self._client.domain, jid) 61 if response.isError(): 62 raise StanzaError(response) 63 64 catalog_node = response.getTag('catalog', 65 namespace=Namespace.SECLABEL_CATALOG) 66 to = catalog_node.getAttr('to') 67 items = catalog_node.getTags('item') 68 69 labels = {} 70 default = None 71 for item in items: 72 label = item.getAttr('selector') 73 if label is None: 74 continue 75 76 security = item.getTag('securitylabel', 77 namespace=Namespace.SECLABEL) 78 if security is None: 79 continue 80 81 try: 82 security_label = SecurityLabel.from_node(security) 83 except ValueError: 84 continue 85 86 labels[label] = security_label 87 88 if item.getAttr('default') == 'true': 89 default = label 90 91 yield Catalog(labels=labels, default=default) 92 93 94def _make_catalog_request(domain, jid): 95 iq = Iq(typ='get', to=domain) 96 iq.addChild(name='catalog', 97 namespace=Namespace.SECLABEL_CATALOG, 98 attrs={'to': jid}) 99 return iq 100 101 102@dataclass 103class Displaymarking: 104 name: str 105 fgcolor: str 106 bgcolor: str 107 108 def to_node(self): 109 displaymarking = Node(tag='displaymarking') 110 if self.fgcolor and self.fgcolor != '#000': 111 displaymarking.setAttr('fgcolor', self.fgcolor) 112 113 if self.bgcolor and self.bgcolor != '#FFF': 114 displaymarking.setAttr('bgcolor', self.bgcolor) 115 116 if self.name: 117 displaymarking.setData(self.name) 118 119 return displaymarking 120 121 @classmethod 122 def from_node(cls, node): 123 return cls(name=node.getData(), 124 fgcolor=node.getAttr('fgcolor') or '#000', 125 bgcolor=node.getAttr('bgcolor') or '#FFF') 126 127 128@dataclass 129class SecurityLabel: 130 displaymarking: Displaymarking 131 label: Node 132 133 def to_node(self): 134 security = Node(tag='securitylabel', 135 attrs={'xmlns': Namespace.SECLABEL}) 136 if self.displaymarking is not None: 137 security.addChild(node=self.displaymarking.to_node()) 138 security.addChild(node=self.label) 139 return security 140 141 @classmethod 142 def from_node(cls, security): 143 displaymarking = security.getTag('displaymarking') 144 if displaymarking is not None: 145 displaymarking = Displaymarking.from_node(displaymarking) 146 147 label = security.getTag('label') 148 if label is None: 149 raise ValueError('label node missing') 150 151 return cls(displaymarking=displaymarking, label=label) 152 153 154@dataclass 155class Catalog: 156 labels: Dict[str, SecurityLabel] 157 default: str 158 159 def get_label_names(self): 160 return list(self.labels.keys()) 161