1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3# pylint: disable=invalid-name,too-few-public-methods
4#
5
6"""
7CTypes mapping between libopus functions and Python.
8"""
9
10import array
11import ctypes  # type: ignore
12import typing
13
14import opuslib
15import opuslib.api
16
17__author__ = 'Никита Кузнецов <self@svartalf.info>'
18__copyright__ = 'Copyright (c) 2012, SvartalF'
19__license__ = 'BSD 3-Clause License'
20
21
22class Decoder(ctypes.Structure):
23    """Opus decoder state.
24    This contains the complete state of an Opus decoder.
25    """
26    pass
27
28
29DecoderPointer = ctypes.POINTER(Decoder)
30
31
32libopus_get_size = opuslib.api.libopus.opus_decoder_get_size
33libopus_get_size.argtypes = (ctypes.c_int,)
34libopus_get_size.restype = ctypes.c_int
35libopus_get_size.__doc__ = 'Gets the size of an OpusDecoder structure'
36
37
38libopus_create = opuslib.api.libopus.opus_decoder_create
39libopus_create.argtypes = (
40    ctypes.c_int,
41    ctypes.c_int,
42    opuslib.api.c_int_pointer
43)
44libopus_create.restype = DecoderPointer
45
46
47def create_state(fs: int, channels: int) -> ctypes.Structure:
48    """
49    Allocates and initializes a decoder state.
50    Wrapper for C opus_decoder_create()
51
52    `fs` must be one of 8000, 12000, 16000, 24000, or 48000.
53
54    Internally Opus stores data at 48000 Hz, so that should be the default
55    value for Fs. However, the decoder can efficiently decode to buffers
56    at 8, 12, 16, and 24 kHz so if for some reason the caller cannot use data
57    at the full sample rate, or knows the compressed data doesn't use the full
58    frequency range, it can request decoding at a reduced rate. Likewise, the
59    decoder is capable of filling in either mono or interleaved stereo pcm
60    buffers, at the caller's request.
61
62    :param fs: Sample rate to decode at (Hz).
63    :param int: Number of channels (1 or 2) to decode.
64    """
65    result_code = ctypes.c_int()
66
67    decoder_state = libopus_create(
68        fs,
69        channels,
70        ctypes.byref(result_code)
71    )
72
73    if result_code.value is not 0:
74        raise opuslib.exceptions.OpusError(result_code.value)
75
76    return decoder_state
77
78
79libopus_packet_get_bandwidth = opuslib.api.libopus.opus_packet_get_bandwidth
80# `argtypes` must be a sequence (,) of types!
81libopus_packet_get_bandwidth.argtypes = (ctypes.c_char_p,)
82libopus_packet_get_bandwidth.restype = ctypes.c_int
83
84
85# FIXME: Remove typing.Any once we have a stub for ctypes
86def packet_get_bandwidth(data: bytes) -> typing.Union[int, typing.Any]:
87    """Gets the bandwidth of an Opus packet."""
88    data_pointer = ctypes.c_char_p(data)
89
90    result = libopus_packet_get_bandwidth(data_pointer)
91
92    if result < 0:
93        raise opuslib.exceptions.OpusError(result)
94
95    return result
96
97
98libopus_packet_get_nb_channels = opuslib.api.libopus.opus_packet_get_nb_channels  # NOQA
99# `argtypes` must be a sequence (,) of types!
100libopus_packet_get_nb_channels.argtypes = (ctypes.c_char_p,)
101libopus_packet_get_nb_channels.restype = ctypes.c_int
102
103
104# FIXME: Remove typing.Any once we have a stub for ctypes
105def packet_get_nb_channels(data: bytes) -> typing.Union[int, typing.Any]:
106    """Gets the number of channels from an Opus packet"""
107    data_pointer = ctypes.c_char_p(data)
108
109    result = libopus_packet_get_nb_channels(data_pointer)
110
111    if result < 0:
112        raise opuslib.exceptions.OpusError(result)
113
114    return result
115
116
117libopus_packet_get_nb_frames = opuslib.api.libopus.opus_packet_get_nb_frames
118libopus_packet_get_nb_frames.argtypes = (ctypes.c_char_p, ctypes.c_int)
119libopus_packet_get_nb_frames.restype = ctypes.c_int
120
121
122# FIXME: Remove typing.Any once we have a stub for ctypes
123def packet_get_nb_frames(
124        data: bytes,
125        length: typing.Optional[int] = None
126) -> typing.Union[int, typing.Any]:
127    """Gets the number of frames in an Opus packet"""
128    data_pointer = ctypes.c_char_p(data)
129
130    if length is None:
131        length = len(data)
132
133    result = libopus_packet_get_nb_frames(data_pointer, ctypes.c_int(length))
134
135    if result < 0:
136        raise opuslib.exceptions.OpusError(result)
137
138    return result
139
140
141libopus_packet_get_samples_per_frame = \
142    opuslib.api.libopus.opus_packet_get_samples_per_frame
143libopus_packet_get_samples_per_frame.argtypes = (ctypes.c_char_p, ctypes.c_int)
144libopus_packet_get_samples_per_frame.restype = ctypes.c_int
145
146
147# FIXME: Remove typing.Any once we have a stub for ctypes
148def packet_get_samples_per_frame(
149        data: bytes,
150        fs: int
151) -> typing.Union[int, typing.Any]:
152    """Gets the number of samples per frame from an Opus packet"""
153    data_pointer = ctypes.c_char_p(data)
154
155    result = libopus_packet_get_nb_frames(data_pointer, ctypes.c_int(fs))
156
157    if result < 0:
158        raise opuslib.exceptions.OpusError(result)
159
160    return result
161
162
163libopus_get_nb_samples = opuslib.api.libopus.opus_decoder_get_nb_samples
164libopus_get_nb_samples.argtypes = (
165    DecoderPointer,
166    ctypes.c_char_p,
167    ctypes.c_int32
168)
169libopus_get_nb_samples.restype = ctypes.c_int
170
171
172# FIXME: Remove typing.Any once we have a stub for ctypes
173def get_nb_samples(
174        decoder_state: ctypes.Structure,
175        packet: bytes,
176        length: int
177) -> typing.Union[int, typing.Any]:
178    """
179    Gets the number of samples of an Opus packet.
180
181    Parameters
182    [in]	dec	OpusDecoder*: Decoder state
183    [in]	packet	char*: Opus packet
184    [in]	len	opus_int32: Length of packet
185
186    Returns
187    Number of samples
188
189    Return values
190    OPUS_BAD_ARG	Insufficient data was passed to the function
191    OPUS_INVALID_PACKET	The compressed data passed is corrupted or of an
192        unsupported type
193    """
194    result = libopus_get_nb_samples(decoder_state, packet, length)
195
196    if result < 0:
197        raise opuslib.exceptions.OpusError(result)
198
199    return result
200
201
202libopus_decode = opuslib.api.libopus.opus_decode
203libopus_decode.argtypes = (
204    DecoderPointer,
205    ctypes.c_char_p,
206    ctypes.c_int32,
207    opuslib.api.c_int16_pointer,
208    ctypes.c_int,
209    ctypes.c_int
210)
211libopus_decode.restype = ctypes.c_int
212
213
214# FIXME: Remove typing.Any once we have a stub for ctypes
215def decode(  # pylint: disable=too-many-arguments
216        decoder_state: ctypes.Structure,
217        opus_data: bytes,
218        length: int,
219        frame_size: int,
220        decode_fec: bool,
221        channels: int = 2
222) -> typing.Union[bytes, typing.Any]:
223    """
224    Decode an Opus Frame to PCM.
225
226    Unlike the `opus_decode` function , this function takes an additional
227    parameter `channels`, which indicates the number of channels in the frame.
228    """
229    _decode_fec = int(decode_fec)
230    result: int = 0
231
232    pcm_size = frame_size * channels * ctypes.sizeof(ctypes.c_int16)
233    pcm = (ctypes.c_int16 * pcm_size)()
234    pcm_pointer = ctypes.cast(pcm, opuslib.api.c_int16_pointer)
235
236    result = libopus_decode(
237        decoder_state,
238        opus_data,
239        length,
240        pcm_pointer,
241        frame_size,
242        _decode_fec
243    )
244
245    if result < 0:
246        raise opuslib.exceptions.OpusError(result)
247
248    return array.array('h', pcm_pointer[:result * channels]).tobytes()
249
250
251libopus_decode_float = opuslib.api.libopus.opus_decode_float
252libopus_decode_float.argtypes = (
253    DecoderPointer,
254    ctypes.c_char_p,
255    ctypes.c_int32,
256    opuslib.api.c_float_pointer,
257    ctypes.c_int,
258    ctypes.c_int
259)
260libopus_decode_float.restype = ctypes.c_int
261
262
263# FIXME: Remove typing.Any once we have a stub for ctypes
264def decode_float(  # pylint: disable=too-many-arguments
265        decoder_state: ctypes.Structure,
266        opus_data: bytes,
267        length: int,
268        frame_size: int,
269        decode_fec: bool,
270        channels: int = 2
271) -> typing.Union[bytes, typing.Any]:
272    """
273    Decode an Opus Frame.
274
275    Unlike the `opus_decode` function , this function takes an additional
276    parameter `channels`, which indicates the number of channels in the frame.
277    """
278    _decode_fec = int(decode_fec)
279
280    pcm_size = frame_size * channels * ctypes.sizeof(ctypes.c_float)
281    pcm = (ctypes.c_float * pcm_size)()
282    pcm_pointer = ctypes.cast(pcm, opuslib.api.c_float_pointer)
283
284    result = libopus_decode_float(
285        decoder_state,
286        opus_data,
287        length,
288        pcm_pointer,
289        frame_size,
290        _decode_fec
291    )
292
293    if result < 0:
294        raise opuslib.exceptions.OpusError(result)
295
296    return array.array('f', pcm[:result * channels]).tobytes()
297
298
299libopus_ctl = opuslib.api.libopus.opus_decoder_ctl
300libopus_ctl.restype = ctypes.c_int
301
302
303# FIXME: Remove typing.Any once we have a stub for ctypes
304def decoder_ctl(
305        decoder_state: ctypes.Structure,
306        request,
307        value=None
308) -> typing.Union[int, typing.Any]:
309    if value is not None:
310        return request(libopus_ctl, decoder_state, value)
311    return request(libopus_ctl, decoder_state)
312
313
314destroy = opuslib.api.libopus.opus_decoder_destroy
315destroy.argtypes = (DecoderPointer,)
316destroy.restype = None
317destroy.__doc__ = 'Frees an OpusDecoder allocated by opus_decoder_create()'
318