1# -*- coding: utf-8 -*- 2# Copyright (C) 2012, 2013 Christoph Reiter 3# 4# This program is free software; you can redistribute it and/or modify 5# it under the terms of the GNU General Public License as published by 6# the Free Software Foundation; either version 2 of the License, or 7# (at your option) any later version. 8 9"""Read and write Ogg Opus comments. 10 11This module handles Opus files wrapped in an Ogg bitstream. The 12first Opus stream found is used. 13 14Based on http://tools.ietf.org/html/draft-terriberry-oggopus-01 15""" 16 17__all__ = ["OggOpus", "Open", "delete"] 18 19import struct 20 21from mutagen import StreamInfo 22from mutagen._compat import BytesIO 23from mutagen._util import get_size, loadfile, convert_error 24from mutagen._tags import PaddingInfo 25from mutagen._vorbis import VCommentDict 26from mutagen.ogg import OggPage, OggFileType, error as OggError 27 28 29class error(OggError): 30 pass 31 32 33class OggOpusHeaderError(error): 34 pass 35 36 37class OggOpusInfo(StreamInfo): 38 """OggOpusInfo() 39 40 Ogg Opus stream information. 41 42 Attributes: 43 length (`float`): File length in seconds, as a float 44 channels (`int`): Number of channels 45 """ 46 47 length = 0 48 channels = 0 49 50 def __init__(self, fileobj): 51 page = OggPage(fileobj) 52 while not page.packets[0].startswith(b"OpusHead"): 53 page = OggPage(fileobj) 54 55 self.serial = page.serial 56 57 if not page.first: 58 raise OggOpusHeaderError( 59 "page has ID header, but doesn't start a stream") 60 61 (version, self.channels, pre_skip, orig_sample_rate, output_gain, 62 channel_map) = struct.unpack("<BBHIhB", page.packets[0][8:19]) 63 64 self.__pre_skip = pre_skip 65 66 # only the higher 4 bits change on incombatible changes 67 major = version >> 4 68 if major != 0: 69 raise OggOpusHeaderError("version %r unsupported" % major) 70 71 def _post_tags(self, fileobj): 72 page = OggPage.find_last(fileobj, self.serial, finishing=True) 73 if page is None: 74 raise OggOpusHeaderError 75 self.length = (page.position - self.__pre_skip) / float(48000) 76 77 def pprint(self): 78 return u"Ogg Opus, %.2f seconds" % (self.length) 79 80 81class OggOpusVComment(VCommentDict): 82 """Opus comments embedded in an Ogg bitstream.""" 83 84 def __get_comment_pages(self, fileobj, info): 85 # find the first tags page with the right serial 86 page = OggPage(fileobj) 87 while ((info.serial != page.serial) or 88 not page.packets[0].startswith(b"OpusTags")): 89 page = OggPage(fileobj) 90 91 # get all comment pages 92 pages = [page] 93 while not (pages[-1].complete or len(pages[-1].packets) > 1): 94 page = OggPage(fileobj) 95 if page.serial == pages[0].serial: 96 pages.append(page) 97 98 return pages 99 100 def __init__(self, fileobj, info): 101 pages = self.__get_comment_pages(fileobj, info) 102 data = OggPage.to_packets(pages)[0][8:] # Strip OpusTags 103 fileobj = BytesIO(data) 104 super(OggOpusVComment, self).__init__(fileobj, framing=False) 105 self._padding = len(data) - self._size 106 107 # in case the LSB of the first byte after v-comment is 1, preserve the 108 # following data 109 padding_flag = fileobj.read(1) 110 if padding_flag and ord(padding_flag) & 0x1: 111 self._pad_data = padding_flag + fileobj.read() 112 self._padding = 0 # we have to preserve, so no padding 113 else: 114 self._pad_data = b"" 115 116 def _inject(self, fileobj, padding_func): 117 fileobj.seek(0) 118 info = OggOpusInfo(fileobj) 119 old_pages = self.__get_comment_pages(fileobj, info) 120 121 packets = OggPage.to_packets(old_pages) 122 vcomment_data = b"OpusTags" + self.write(framing=False) 123 124 if self._pad_data: 125 # if we have padding data to preserver we can't add more padding 126 # as long as we don't know the structure of what follows 127 packets[0] = vcomment_data + self._pad_data 128 else: 129 content_size = get_size(fileobj) - len(packets[0]) # approx 130 padding_left = len(packets[0]) - len(vcomment_data) 131 info = PaddingInfo(padding_left, content_size) 132 new_padding = info._get_padding(padding_func) 133 packets[0] = vcomment_data + b"\x00" * new_padding 134 135 new_pages = OggPage._from_packets_try_preserve(packets, old_pages) 136 OggPage.replace(fileobj, old_pages, new_pages) 137 138 139class OggOpus(OggFileType): 140 """OggOpus(filething) 141 142 An Ogg Opus file. 143 144 Arguments: 145 filething (filething) 146 147 Attributes: 148 info (`OggOpusInfo`) 149 tags (`mutagen._vorbis.VCommentDict`) 150 151 """ 152 153 _Info = OggOpusInfo 154 _Tags = OggOpusVComment 155 _Error = OggOpusHeaderError 156 _mimes = ["audio/ogg", "audio/ogg; codecs=opus"] 157 158 info = None 159 tags = None 160 161 @staticmethod 162 def score(filename, fileobj, header): 163 return (header.startswith(b"OggS") * (b"OpusHead" in header)) 164 165 166Open = OggOpus 167 168 169@convert_error(IOError, error) 170@loadfile(method=False, writable=True) 171def delete(filething): 172 """ delete(filething) 173 174 Arguments: 175 filething (filething) 176 Raises: 177 mutagen.MutagenError 178 179 Remove tags from a file. 180 """ 181 182 t = OggOpus(filething) 183 filething.fileobj.seek(0) 184 t.delete(filething) 185