1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3# pylint: disable=invalid-name
4#
5
6"""
7CTypes mapping between libopus functions and Python.
8"""
9
10import array
11import ctypes  # type: ignore
12import typing
13
14import opuslib
15import opuslib.api
16
17__author__ = 'Никита Кузнецов <self@svartalf.info>'
18__copyright__ = 'Copyright (c) 2012, SvartalF'
19__license__ = 'BSD 3-Clause License'
20
21
22class Encoder(ctypes.Structure):  # pylint: disable=too-few-public-methods
23    """Opus encoder state.
24    This contains the complete state of an Opus encoder.
25    """
26    pass
27
28
29EncoderPointer = ctypes.POINTER(Encoder)
30
31
32libopus_get_size = opuslib.api.libopus.opus_encoder_get_size
33libopus_get_size.argtypes = (ctypes.c_int,)  # must be sequence (,) of types!
34libopus_get_size.restype = ctypes.c_int
35
36
37# FIXME: Remove typing.Any once we have a stub for ctypes
38def get_size(channels: int) -> typing.Union[int, typing.Any]:
39    """Gets the size of an OpusEncoder structure."""
40    if channels not in (1, 2):
41        raise ValueError('Wrong channels value. Must be equal to 1 or 2')
42    return libopus_get_size(channels)
43
44
45libopus_create = opuslib.api.libopus.opus_encoder_create
46libopus_create.argtypes = (
47    ctypes.c_int,
48    ctypes.c_int,
49    ctypes.c_int,
50    opuslib.api.c_int_pointer
51)
52libopus_create.restype = EncoderPointer
53
54
55def create_state(fs: int, channels: int, application: int) -> ctypes.Structure:
56    """Allocates and initializes an encoder state."""
57    result_code = ctypes.c_int()
58
59    result = libopus_create(
60        fs,
61        channels,
62        application,
63        ctypes.byref(result_code)
64    )
65
66    if result_code.value is not opuslib.OK:
67        raise opuslib.OpusError(result_code.value)
68
69    return result
70
71
72libopus_ctl = opuslib.api.libopus.opus_encoder_ctl
73libopus_ctl.restype = ctypes.c_int
74
75
76# FIXME: Remove typing.Any once we have a stub for ctypes
77def encoder_ctl(
78        encoder_state: ctypes.Structure,
79        request,
80        value=None
81) -> typing.Union[int, typing.Any]:
82    if value is not None:
83        return request(libopus_ctl, encoder_state, value)
84    return request(libopus_ctl, encoder_state)
85
86
87libopus_encode = opuslib.api.libopus.opus_encode
88libopus_encode.argtypes = (
89    EncoderPointer,
90    opuslib.api.c_int16_pointer,
91    ctypes.c_int,
92    ctypes.c_char_p,
93    ctypes.c_int32
94)
95libopus_encode.restype = ctypes.c_int32
96
97
98# FIXME: Remove typing.Any once we have a stub for ctypes
99def encode(
100        encoder_state: ctypes.Structure,
101        pcm_data: bytes,
102        frame_size: int,
103        max_data_bytes: int
104) -> typing.Union[bytes, typing.Any]:
105    """
106    Encodes an Opus Frame.
107
108    Returns string output payload.
109
110    Parameters:
111    [in]	st	OpusEncoder*: Encoder state
112    [in]	pcm	opus_int16*: Input signal (interleaved if 2 channels). length
113        is frame_size*channels*sizeof(opus_int16)
114    [in]	frame_size	int: Number of samples per channel in the input signal.
115        This must be an Opus frame size for the encoder's sampling rate. For
116            example, at 48 kHz the permitted values are 120, 240, 480, 960,
117            1920, and 2880. Passing in a duration of less than 10 ms
118            (480 samples at 48 kHz) will prevent the encoder from using the
119            LPC or hybrid modes.
120    [out]	data	unsigned char*: Output payload. This must contain storage
121        for at least max_data_bytes.
122    [in]	max_data_bytes	opus_int32: Size of the allocated memory for the
123        output payload. This may be used to impose an upper limit on the
124        instant bitrate, but should not be used as the only bitrate control.
125        Use OPUS_SET_BITRATE to control the bitrate.
126    """
127    pcm_pointer = ctypes.cast(pcm_data, opuslib.api.c_int16_pointer)
128    opus_data = (ctypes.c_char * max_data_bytes)()
129
130    result = libopus_encode(
131        encoder_state,
132        pcm_pointer,
133        frame_size,
134        opus_data,
135        max_data_bytes
136    )
137
138    if result < 0:
139        raise opuslib.OpusError(
140            'Opus Encoder returned result="{}"'.format(result))
141
142    return array.array('b', opus_data[:result]).tobytes()
143
144
145libopus_encode_float = opuslib.api.libopus.opus_encode_float
146libopus_encode_float.argtypes = (
147    EncoderPointer,
148    opuslib.api.c_float_pointer,
149    ctypes.c_int,
150    ctypes.c_char_p,
151    ctypes.c_int32
152)
153libopus_encode_float.restype = ctypes.c_int32
154
155
156# FIXME: Remove typing.Any once we have a stub for ctypes
157def encode_float(
158        encoder_state: ctypes.Structure,
159        pcm_data: bytes,
160        frame_size: int,
161        max_data_bytes: int
162) -> typing.Union[bytes, typing.Any]:
163    """Encodes an Opus frame from floating point input"""
164    pcm_pointer = ctypes.cast(pcm_data, opuslib.api.c_float_pointer)
165    opus_data = (ctypes.c_char * max_data_bytes)()
166
167    result = libopus_encode_float(
168        encoder_state,
169        pcm_pointer,
170        frame_size,
171        opus_data,
172        max_data_bytes
173    )
174
175    if result < 0:
176        raise opuslib.OpusError(
177            'Encoder returned result="{}"'.format(result))
178
179    return array.array('b', opus_data[:result]).tobytes()
180
181
182destroy = opuslib.api.libopus.opus_encoder_destroy
183destroy.argtypes = (EncoderPointer,)  # must be sequence (,) of types!
184destroy.restype = None
185destroy.__doc__ = "Frees an OpusEncoder allocated by opus_encoder_create()"
186