1#!/usr/bin/env python 2# -*- coding: utf-8 -*- 3# pylint: disable=invalid-name,too-few-public-methods 4# 5 6""" 7CTypes mapping between libopus functions and Python. 8""" 9 10import array 11import ctypes # type: ignore 12import typing 13 14import opuslib 15import opuslib.api 16 17__author__ = 'Никита Кузнецов <self@svartalf.info>' 18__copyright__ = 'Copyright (c) 2012, SvartalF' 19__license__ = 'BSD 3-Clause License' 20 21 22class Decoder(ctypes.Structure): 23 """Opus decoder state. 24 This contains the complete state of an Opus decoder. 25 """ 26 pass 27 28 29DecoderPointer = ctypes.POINTER(Decoder) 30 31 32libopus_get_size = opuslib.api.libopus.opus_decoder_get_size 33libopus_get_size.argtypes = (ctypes.c_int,) 34libopus_get_size.restype = ctypes.c_int 35libopus_get_size.__doc__ = 'Gets the size of an OpusDecoder structure' 36 37 38libopus_create = opuslib.api.libopus.opus_decoder_create 39libopus_create.argtypes = ( 40 ctypes.c_int, 41 ctypes.c_int, 42 opuslib.api.c_int_pointer 43) 44libopus_create.restype = DecoderPointer 45 46 47def create_state(fs: int, channels: int) -> ctypes.Structure: 48 """ 49 Allocates and initializes a decoder state. 50 Wrapper for C opus_decoder_create() 51 52 `fs` must be one of 8000, 12000, 16000, 24000, or 48000. 53 54 Internally Opus stores data at 48000 Hz, so that should be the default 55 value for Fs. However, the decoder can efficiently decode to buffers 56 at 8, 12, 16, and 24 kHz so if for some reason the caller cannot use data 57 at the full sample rate, or knows the compressed data doesn't use the full 58 frequency range, it can request decoding at a reduced rate. Likewise, the 59 decoder is capable of filling in either mono or interleaved stereo pcm 60 buffers, at the caller's request. 61 62 :param fs: Sample rate to decode at (Hz). 63 :param int: Number of channels (1 or 2) to decode. 64 """ 65 result_code = ctypes.c_int() 66 67 decoder_state = libopus_create( 68 fs, 69 channels, 70 ctypes.byref(result_code) 71 ) 72 73 if result_code.value is not 0: 74 raise opuslib.exceptions.OpusError(result_code.value) 75 76 return decoder_state 77 78 79libopus_packet_get_bandwidth = opuslib.api.libopus.opus_packet_get_bandwidth 80# `argtypes` must be a sequence (,) of types! 81libopus_packet_get_bandwidth.argtypes = (ctypes.c_char_p,) 82libopus_packet_get_bandwidth.restype = ctypes.c_int 83 84 85# FIXME: Remove typing.Any once we have a stub for ctypes 86def packet_get_bandwidth(data: bytes) -> typing.Union[int, typing.Any]: 87 """Gets the bandwidth of an Opus packet.""" 88 data_pointer = ctypes.c_char_p(data) 89 90 result = libopus_packet_get_bandwidth(data_pointer) 91 92 if result < 0: 93 raise opuslib.exceptions.OpusError(result) 94 95 return result 96 97 98libopus_packet_get_nb_channels = opuslib.api.libopus.opus_packet_get_nb_channels # NOQA 99# `argtypes` must be a sequence (,) of types! 100libopus_packet_get_nb_channels.argtypes = (ctypes.c_char_p,) 101libopus_packet_get_nb_channels.restype = ctypes.c_int 102 103 104# FIXME: Remove typing.Any once we have a stub for ctypes 105def packet_get_nb_channels(data: bytes) -> typing.Union[int, typing.Any]: 106 """Gets the number of channels from an Opus packet""" 107 data_pointer = ctypes.c_char_p(data) 108 109 result = libopus_packet_get_nb_channels(data_pointer) 110 111 if result < 0: 112 raise opuslib.exceptions.OpusError(result) 113 114 return result 115 116 117libopus_packet_get_nb_frames = opuslib.api.libopus.opus_packet_get_nb_frames 118libopus_packet_get_nb_frames.argtypes = (ctypes.c_char_p, ctypes.c_int) 119libopus_packet_get_nb_frames.restype = ctypes.c_int 120 121 122# FIXME: Remove typing.Any once we have a stub for ctypes 123def packet_get_nb_frames( 124 data: bytes, 125 length: typing.Optional[int] = None 126) -> typing.Union[int, typing.Any]: 127 """Gets the number of frames in an Opus packet""" 128 data_pointer = ctypes.c_char_p(data) 129 130 if length is None: 131 length = len(data) 132 133 result = libopus_packet_get_nb_frames(data_pointer, ctypes.c_int(length)) 134 135 if result < 0: 136 raise opuslib.exceptions.OpusError(result) 137 138 return result 139 140 141libopus_packet_get_samples_per_frame = \ 142 opuslib.api.libopus.opus_packet_get_samples_per_frame 143libopus_packet_get_samples_per_frame.argtypes = (ctypes.c_char_p, ctypes.c_int) 144libopus_packet_get_samples_per_frame.restype = ctypes.c_int 145 146 147# FIXME: Remove typing.Any once we have a stub for ctypes 148def packet_get_samples_per_frame( 149 data: bytes, 150 fs: int 151) -> typing.Union[int, typing.Any]: 152 """Gets the number of samples per frame from an Opus packet""" 153 data_pointer = ctypes.c_char_p(data) 154 155 result = libopus_packet_get_nb_frames(data_pointer, ctypes.c_int(fs)) 156 157 if result < 0: 158 raise opuslib.exceptions.OpusError(result) 159 160 return result 161 162 163libopus_get_nb_samples = opuslib.api.libopus.opus_decoder_get_nb_samples 164libopus_get_nb_samples.argtypes = ( 165 DecoderPointer, 166 ctypes.c_char_p, 167 ctypes.c_int32 168) 169libopus_get_nb_samples.restype = ctypes.c_int 170 171 172# FIXME: Remove typing.Any once we have a stub for ctypes 173def get_nb_samples( 174 decoder_state: ctypes.Structure, 175 packet: bytes, 176 length: int 177) -> typing.Union[int, typing.Any]: 178 """ 179 Gets the number of samples of an Opus packet. 180 181 Parameters 182 [in] dec OpusDecoder*: Decoder state 183 [in] packet char*: Opus packet 184 [in] len opus_int32: Length of packet 185 186 Returns 187 Number of samples 188 189 Return values 190 OPUS_BAD_ARG Insufficient data was passed to the function 191 OPUS_INVALID_PACKET The compressed data passed is corrupted or of an 192 unsupported type 193 """ 194 result = libopus_get_nb_samples(decoder_state, packet, length) 195 196 if result < 0: 197 raise opuslib.exceptions.OpusError(result) 198 199 return result 200 201 202libopus_decode = opuslib.api.libopus.opus_decode 203libopus_decode.argtypes = ( 204 DecoderPointer, 205 ctypes.c_char_p, 206 ctypes.c_int32, 207 opuslib.api.c_int16_pointer, 208 ctypes.c_int, 209 ctypes.c_int 210) 211libopus_decode.restype = ctypes.c_int 212 213 214# FIXME: Remove typing.Any once we have a stub for ctypes 215def decode( # pylint: disable=too-many-arguments 216 decoder_state: ctypes.Structure, 217 opus_data: bytes, 218 length: int, 219 frame_size: int, 220 decode_fec: bool, 221 channels: int = 2 222) -> typing.Union[bytes, typing.Any]: 223 """ 224 Decode an Opus Frame to PCM. 225 226 Unlike the `opus_decode` function , this function takes an additional 227 parameter `channels`, which indicates the number of channels in the frame. 228 """ 229 _decode_fec = int(decode_fec) 230 result: int = 0 231 232 pcm_size = frame_size * channels * ctypes.sizeof(ctypes.c_int16) 233 pcm = (ctypes.c_int16 * pcm_size)() 234 pcm_pointer = ctypes.cast(pcm, opuslib.api.c_int16_pointer) 235 236 result = libopus_decode( 237 decoder_state, 238 opus_data, 239 length, 240 pcm_pointer, 241 frame_size, 242 _decode_fec 243 ) 244 245 if result < 0: 246 raise opuslib.exceptions.OpusError(result) 247 248 return array.array('h', pcm_pointer[:result * channels]).tobytes() 249 250 251libopus_decode_float = opuslib.api.libopus.opus_decode_float 252libopus_decode_float.argtypes = ( 253 DecoderPointer, 254 ctypes.c_char_p, 255 ctypes.c_int32, 256 opuslib.api.c_float_pointer, 257 ctypes.c_int, 258 ctypes.c_int 259) 260libopus_decode_float.restype = ctypes.c_int 261 262 263# FIXME: Remove typing.Any once we have a stub for ctypes 264def decode_float( # pylint: disable=too-many-arguments 265 decoder_state: ctypes.Structure, 266 opus_data: bytes, 267 length: int, 268 frame_size: int, 269 decode_fec: bool, 270 channels: int = 2 271) -> typing.Union[bytes, typing.Any]: 272 """ 273 Decode an Opus Frame. 274 275 Unlike the `opus_decode` function , this function takes an additional 276 parameter `channels`, which indicates the number of channels in the frame. 277 """ 278 _decode_fec = int(decode_fec) 279 280 pcm_size = frame_size * channels * ctypes.sizeof(ctypes.c_float) 281 pcm = (ctypes.c_float * pcm_size)() 282 pcm_pointer = ctypes.cast(pcm, opuslib.api.c_float_pointer) 283 284 result = libopus_decode_float( 285 decoder_state, 286 opus_data, 287 length, 288 pcm_pointer, 289 frame_size, 290 _decode_fec 291 ) 292 293 if result < 0: 294 raise opuslib.exceptions.OpusError(result) 295 296 return array.array('f', pcm[:result * channels]).tobytes() 297 298 299libopus_ctl = opuslib.api.libopus.opus_decoder_ctl 300libopus_ctl.restype = ctypes.c_int 301 302 303# FIXME: Remove typing.Any once we have a stub for ctypes 304def decoder_ctl( 305 decoder_state: ctypes.Structure, 306 request, 307 value=None 308) -> typing.Union[int, typing.Any]: 309 if value is not None: 310 return request(libopus_ctl, decoder_state, value) 311 return request(libopus_ctl, decoder_state) 312 313 314destroy = opuslib.api.libopus.opus_decoder_destroy 315destroy.argtypes = (DecoderPointer,) 316destroy.restype = None 317destroy.__doc__ = 'Frees an OpusDecoder allocated by opus_decoder_create()' 318