1# Copyright (c) 2017, The MITRE Corporation. All rights reserved. 2# See LICENSE.txt for complete terms. 3import base64 4import bz2 5import zlib 6 7from mixbox import entities 8from mixbox import fields 9from mixbox.vendor import six 10from mixbox.compat import xor 11 12import cybox.bindings.artifact_object as artifact_binding 13from cybox.common import ObjectProperties, String, HashList 14 15 16def validate_artifact_type(instance, value): 17 if value is None: 18 return 19 elif value in Artifact.TYPES: 20 return 21 else: 22 err = "Type must be one of %s. Received '%s'." % (Artifact.TYPES, value) 23 raise ValueError(err) 24 25 26def validate_byte_order_endianness(instance, value): 27 if value is None: 28 return 29 elif value in RawArtifact.ENDIANNESS: 30 return 31 else: 32 err = "Type must be one of %s. Received '%s'." % (RawArtifact.ENDIANNESS, value) 33 raise ValueError(err) 34 35 36class RawArtifact(String): 37 _binding = artifact_binding 38 _binding_class = _binding.RawArtifactType 39 _namespace = 'http://cybox.mitre.org/objects#ArtifactObject-2' 40 41 BIG_ENDIAN = "Big-endian" 42 LITTLE_ENDIAN = "Little-endian" 43 MIDDLE_ENDIAN = "Middle-endian" 44 ENDIANNESS = (BIG_ENDIAN, LITTLE_ENDIAN, MIDDLE_ENDIAN) 45 46 byte_order = fields.TypedField("byte_order", preset_hook=validate_byte_order_endianness) 47 48 49class Compression(entities.Entity): 50 """A Compression packaging layer 51 52 Currently only zlib and bz2 are supported. 53 Also, compression_mechanism_ref is not currently supported. 54 """ 55 _namespace = 'http://cybox.mitre.org/objects#ArtifactObject-2' 56 _binding = artifact_binding 57 _binding_class = _binding.CompressionType 58 _COMPRESSION_TYPE = None # overridden by subclasses 59 60 compression_mechanism = fields.TypedField("compression_mechanism") 61 compression_mechanism_ref = fields.TypedField("compression_mechanism_ref") 62 63 def __init__(self, compression_mechanism=None, compression_mechanism_ref=None): 64 super(Compression, self).__init__() 65 self.compression_mechanism = compression_mechanism 66 self.compression_mechanism_ref = compression_mechanism_ref 67 68 def pack(self, data): 69 """This should accept byte data and return byte data""" 70 raise NotImplementedError() 71 72 def unpack(self, packed_data): 73 """This should accept byte data and return byte data""" 74 raise NotImplementedError() 75 76 77class Encryption(entities.Entity): 78 """ 79 An encryption packaging layer. 80 """ 81 _namespace = 'http://cybox.mitre.org/objects#ArtifactObject-2' 82 _binding = artifact_binding 83 _binding_class = _binding.EncryptionType 84 _ENCRYPTION_TYPE = None # overridden by subclasses 85 86 encryption_mechanism = fields.TypedField("encryption_mechanism") 87 encryption_mechanism_ref = fields.TypedField("encryption_mechanism_ref") 88 encryption_key = fields.TypedField("encryption_key") 89 encryption_key_ref = fields.TypedField("encryption_key_ref") 90 91 def __init__(self, encryption_mechanism=None, encryption_key=None, 92 encryption_mechanism_ref=None, encryption_key_ref=None): 93 super(Encryption, self).__init__() 94 self.encryption_mechanism = encryption_mechanism 95 self.encryption_key = encryption_key 96 self.encryption_mechanism_ref = encryption_mechanism_ref 97 self.encryption_key_ref = encryption_key_ref 98 99 def pack(self, data): 100 """This should accept byte data and return byte data""" 101 raise NotImplementedError() 102 103 def unpack(self, packed_data): 104 """This should accept byte data and return byte data""" 105 raise NotImplementedError() 106 107 108class Encoding(entities.Entity): 109 """ 110 An encoding packaging layer. 111 112 Currently only base64 with a standard alphabet is supported. 113 """ 114 _binding = artifact_binding 115 _binding_class = _binding.EncodingType 116 _ENCODING_TYPE = None # overridden by subclasses 117 118 algorithm = fields.TypedField("algorithm") 119 character_set = fields.TypedField("character_set") 120 custom_character_set_ref = fields.TypedField("custom_character_set_ref") 121 122 def __init__(self, algorithm=None, character_set=None, custom_character_set_ref=None): 123 super(Encoding, self).__init__() 124 self.algorithm = algorithm 125 self.character_set = character_set 126 self.custom_character_set_ref = custom_character_set_ref 127 128 def pack(self, data): 129 """This should accept byte data and return byte data""" 130 raise NotImplementedError() 131 132 def unpack(self, packed_data): 133 """This should accept byte data and return byte data""" 134 raise NotImplementedError() 135 136 137class EncryptionFactory(entities.EntityFactory): 138 _ENCRYPTION_EXT_MAP = {} 139 140 @classmethod 141 def entity_class(cls, key): 142 return cls._ENCRYPTION_EXT_MAP.get(key, Encryption) 143 144 @classmethod 145 def dictkey(cls, mapping): 146 return mapping.get("encryption_mechanism") 147 148 @classmethod 149 def objkey(cls, obj): 150 return obj.encryption_mechanism 151 152 @classmethod 153 def register_extension(cls, new_cls): 154 cls._ENCRYPTION_EXT_MAP[new_cls._ENCRYPTION_TYPE] = new_cls 155 return new_cls 156 157 158class CompressionFactory(entities.EntityFactory): 159 _COMPRESSION_EXT_MAP = {} 160 161 @classmethod 162 def entity_class(cls, key): 163 return cls._COMPRESSION_EXT_MAP.get(key, Compression) 164 165 @classmethod 166 def dictkey(cls, mapping): 167 return mapping.get("compression_mechanism") 168 169 @classmethod 170 def objkey(cls, obj): 171 return obj.compression_mechanism 172 173 @classmethod 174 def register_extension(cls, new_cls): 175 cls._COMPRESSION_EXT_MAP[new_cls._COMPRESSION_TYPE] = new_cls 176 return new_cls 177 178 179class EncodingFactory(entities.EntityFactory): 180 _ENCODING_EXT_MAP = {} 181 182 @classmethod 183 def entity_class(cls, key): 184 return cls._ENCODING_EXT_MAP.get(key, Encoding) 185 186 @classmethod 187 def dictkey(cls, mapping): 188 return mapping.get("algorithm", "Base64") # default is Base64 189 190 @classmethod 191 def objkey(cls, obj): 192 return getattr(obj, "algorithm", "Base64") # default is Base64 193 194 @classmethod 195 def register_extension(cls, new_cls): 196 cls._ENCODING_EXT_MAP[new_cls._ENCODING_TYPE] = new_cls 197 return new_cls 198 199 200@CompressionFactory.register_extension 201class ZlibCompression(Compression): 202 _COMPRESSION_TYPE = "zlib" 203 204 def __init__(self): 205 super(ZlibCompression, self).__init__(compression_mechanism="zlib") 206 207 def pack(self, data): 208 return zlib.compress(data) 209 210 def unpack(self, packed_data): 211 return zlib.decompress(packed_data) 212 213 214@CompressionFactory.register_extension 215class Bz2Compression(Compression): 216 _COMPRESSION_TYPE = "bz2" 217 218 def __init__(self): 219 super(Bz2Compression, self).__init__(compression_mechanism="bz2") 220 221 def pack(self, data): 222 return bz2.compress(data) 223 224 def unpack(self, packed_data): 225 return bz2.decompress(packed_data) 226 227 228@EncryptionFactory.register_extension 229class XOREncryption(Encryption): 230 _ENCRYPTION_TYPE = "xor" 231 232 def __init__(self, key=None): 233 super(XOREncryption, self).__init__( 234 encryption_mechanism="xor", 235 encryption_key=key 236 ) 237 238 def pack(self, data): 239 return xor(data, self.encryption_key) 240 241 def unpack(self, packed_data): 242 return xor(packed_data, self.encryption_key) 243 244 245@EncryptionFactory.register_extension 246class PasswordProtectedZipEncryption(Encryption): 247 _ENCRYPTION_TYPE = "PasswordProtected" 248 249 def __init__(self, key=None): 250 super(PasswordProtectedZipEncryption, self).__init__( 251 encryption_mechanism="PasswordProtected", 252 encryption_key=key 253 ) 254 255 # `pack` is not implemented 256 257 def unpack(self, packed_data): 258 from zipfile import ZipFile 259 260 buf = six.StringIO(packed_data) 261 with ZipFile(buf, 'r') as myzip: 262 # Assume there is only one member in the archive, and that it 263 # contains the artifact data. Ignore the name. 264 filename = myzip.namelist()[0] 265 data = myzip.read(filename, self.encryption_key) 266 267 return data 268 269 270@EncodingFactory.register_extension 271class Base64Encoding(Encoding): 272 _ENCODING_TYPE = "Base64" 273 274 def __init__(self): 275 super(Base64Encoding, self).__init__(algorithm="Base64") 276 277 def pack(self, data): 278 return base64.b64encode(data) 279 280 def unpack(self, packed_data): 281 return base64.b64decode(packed_data) 282 283 284class Packaging(entities.Entity): 285 """An individual packaging layer.""" 286 _namespace = 'http://cybox.mitre.org/objects#ArtifactObject-2' 287 _binding = artifact_binding 288 _binding_class = _binding.PackagingType 289 290 is_encrypted = fields.BooleanField("is_encrypted") 291 is_compressed = fields.BooleanField("is_compressed") 292 compression = fields.TypedField("Compression", Compression, factory=CompressionFactory, multiple=True) 293 encryption = fields.TypedField("Encryption", Encryption, factory=EncryptionFactory, multiple=True) 294 encoding = fields.TypedField("Encoding", Encoding, factory=EncodingFactory, multiple=True) 295 296 def __init__(self, is_encrypted=None, is_compressed=None, compression=None, encryption=None, encoding=None): 297 super(Packaging, self).__init__() 298 self.is_encrypted = is_encrypted 299 self.is_compressed = is_compressed 300 self.compression = compression 301 self.encryption = encryption 302 self.encoding = encoding 303 304 305class Artifact(ObjectProperties): 306 # Warning: Do not attempt to get or set Raw_Artifact directly. Use `data` 307 # or `packed_data` respectively. The Raw_Artifact value will be set on 308 # export. You can set BaseObjectProperties or PatternFieldGroup attributes. 309 _binding = artifact_binding 310 _binding_class = _binding.ArtifactObjectType 311 _namespace = 'http://cybox.mitre.org/objects#ArtifactObject-2' 312 _XSI_NS = "ArtifactObj" 313 _XSI_TYPE = "ArtifactObjectType" 314 315 TYPE_FILE = "File" 316 TYPE_MEMORY = "Memory Region" 317 TYPE_FILE_SYSTEM = "File System Fragment" 318 TYPE_NETWORK = "Network Traffic" 319 TYPE_GENERIC = "Generic Data Region" 320 TYPES = (TYPE_FILE, TYPE_FILE_SYSTEM, TYPE_GENERIC, TYPE_MEMORY, TYPE_NETWORK) 321 322 hashes = fields.TypedField("Hashes", HashList) 323 packaging = fields.TypedField("Packaging", Packaging) 324 type_ = fields.TypedField("type_", key_name="type", preset_hook=validate_artifact_type) 325 content_type = fields.TypedField("content_type") 326 content_type_version = fields.TypedField("content_type_version") 327 suspected_malicious = fields.TypedField("suspected_malicious") 328 # TODO: xs:choice 329 raw_artifact = fields.TypedField("Raw_Artifact", RawArtifact) 330 raw_artifact_reference = fields.TypedField("Raw_Artifact_Reference") 331 332 def __init__(self, data=None, type_=None): 333 super(Artifact, self).__init__() 334 self.type_ = type_ 335 336 # `data` is the actual binary data that is being encoded in this 337 # Artifact. It should use the `str` type on Python 2 or the `bytes` 338 # type on Python 3. 339 340 # `packed_data` is the literal character data that comes from (or 341 # becomes) the contents of the Raw_Artifact element. It should be a 342 # Unicode string (`unicode` on Python 2, `str` on Python 3), and should 343 # in general be ASCII-encoded, since any other data should be 344 # Base64-encoded. 345 346 # Only one of these two attributes can be set directly. The other can 347 # be calculated based on the various `Packaging` types added to this 348 # Artifact. 349 350 # We set the private attribute `_packed_data` first, so that the setter 351 # for `data` has access to this attribute. 352 self._packed_data = None 353 self.data = data 354 355 @property 356 def data(self): 357 """Should return a byte string""" 358 if self._data: 359 return self._data 360 elif self._packed_data: 361 tmp_data = self._packed_data.encode('ascii') 362 if self.packaging: 363 for p in reversed(self.packaging.encoding): 364 tmp_data = p.unpack(tmp_data) 365 for p in reversed(self.packaging.encryption): 366 tmp_data = p.unpack(tmp_data) 367 for p in reversed(self.packaging.compression): 368 tmp_data = p.unpack(tmp_data) 369 return tmp_data 370 else: 371 return None 372 373 @data.setter 374 def data(self, value): 375 if self._packed_data: 376 raise ValueError("packed_data already set, can't set data") 377 if value is not None and not isinstance(value, six.binary_type): 378 msg = ("Artifact data must be either None or byte data, not a " 379 "Unicode string.") 380 raise ValueError(msg) 381 self._data = value 382 383 @property 384 def packed_data(self): 385 """Should return a Unicode string""" 386 if self._packed_data: 387 return self._packed_data 388 elif self._data: 389 tmp_data = self._data 390 if self.packaging: 391 for p in self.packaging.compression: 392 tmp_data = p.pack(tmp_data) 393 for p in self.packaging.encryption: 394 tmp_data = p.pack(tmp_data) 395 for p in self.packaging.encoding: 396 tmp_data = p.pack(tmp_data) 397 return tmp_data.decode('ascii') 398 else: 399 return None 400 401 @packed_data.setter 402 def packed_data(self, value): 403 if self._data: 404 raise ValueError("data already set, can't set packed_data") 405 if value is not None and not isinstance(value, six.text_type): 406 msg = ("Artifact packed_data must be either None or a Unicode " 407 "string, not byte data.") 408 raise ValueError(msg) 409 self._packed_data = value 410 411 def to_obj(self, ns_info=None): 412 artifact_obj = super(Artifact, self).to_obj(ns_info=ns_info) 413 414 if self.packed_data: 415 if not self.raw_artifact: 416 self.raw_artifact = RawArtifact() 417 self.raw_artifact.value = self.packed_data 418 artifact_obj.Raw_Artifact = self.raw_artifact.to_obj(ns_info=ns_info) 419 420 return artifact_obj 421 422 def to_dict(self): 423 artifact_dict = super(Artifact, self).to_dict() 424 425 if self.packed_data: 426 if not self.raw_artifact: 427 self.raw_artifact = RawArtifact() 428 self.raw_artifact.value = self.packed_data 429 artifact_dict['raw_artifact'] = self.raw_artifact.to_dict() 430 431 return artifact_dict 432 433 @classmethod 434 def from_obj(cls, cls_obj): 435 if not cls_obj: 436 return None 437 438 artifact = super(Artifact, cls).from_obj(cls_obj) 439 440 raw_artifact = cls_obj.Raw_Artifact 441 if raw_artifact: 442 artifact.raw_artifact = RawArtifact.from_obj(raw_artifact) 443 artifact.packed_data = six.text_type(artifact.raw_artifact.value) 444 445 return artifact 446 447 @classmethod 448 def from_dict(cls, cls_dict): 449 if not cls_dict: 450 return None 451 452 artifact = super(Artifact, cls).from_dict(cls_dict) 453 454 raw_artifact = cls_dict.get('raw_artifact') 455 if raw_artifact: 456 artifact.raw_artifact = RawArtifact.from_dict(raw_artifact) 457 artifact.packed_data = six.text_type(artifact.raw_artifact.value) 458 459 return artifact 460