1#!/usr/bin/env python 2# -*- coding: utf-8 -*- 3# pylint: disable=invalid-name 4# 5 6""" 7CTypes mapping between libopus functions and Python. 8""" 9 10import array 11import ctypes # type: ignore 12import typing 13 14import opuslib 15import opuslib.api 16 17__author__ = 'Никита Кузнецов <self@svartalf.info>' 18__copyright__ = 'Copyright (c) 2012, SvartalF' 19__license__ = 'BSD 3-Clause License' 20 21 22class Encoder(ctypes.Structure): # pylint: disable=too-few-public-methods 23 """Opus encoder state. 24 This contains the complete state of an Opus encoder. 25 """ 26 pass 27 28 29EncoderPointer = ctypes.POINTER(Encoder) 30 31 32libopus_get_size = opuslib.api.libopus.opus_encoder_get_size 33libopus_get_size.argtypes = (ctypes.c_int,) # must be sequence (,) of types! 34libopus_get_size.restype = ctypes.c_int 35 36 37# FIXME: Remove typing.Any once we have a stub for ctypes 38def get_size(channels: int) -> typing.Union[int, typing.Any]: 39 """Gets the size of an OpusEncoder structure.""" 40 if channels not in (1, 2): 41 raise ValueError('Wrong channels value. Must be equal to 1 or 2') 42 return libopus_get_size(channels) 43 44 45libopus_create = opuslib.api.libopus.opus_encoder_create 46libopus_create.argtypes = ( 47 ctypes.c_int, 48 ctypes.c_int, 49 ctypes.c_int, 50 opuslib.api.c_int_pointer 51) 52libopus_create.restype = EncoderPointer 53 54 55def create_state(fs: int, channels: int, application: int) -> ctypes.Structure: 56 """Allocates and initializes an encoder state.""" 57 result_code = ctypes.c_int() 58 59 result = libopus_create( 60 fs, 61 channels, 62 application, 63 ctypes.byref(result_code) 64 ) 65 66 if result_code.value is not opuslib.OK: 67 raise opuslib.OpusError(result_code.value) 68 69 return result 70 71 72libopus_ctl = opuslib.api.libopus.opus_encoder_ctl 73libopus_ctl.restype = ctypes.c_int 74 75 76# FIXME: Remove typing.Any once we have a stub for ctypes 77def encoder_ctl( 78 encoder_state: ctypes.Structure, 79 request, 80 value=None 81) -> typing.Union[int, typing.Any]: 82 if value is not None: 83 return request(libopus_ctl, encoder_state, value) 84 return request(libopus_ctl, encoder_state) 85 86 87libopus_encode = opuslib.api.libopus.opus_encode 88libopus_encode.argtypes = ( 89 EncoderPointer, 90 opuslib.api.c_int16_pointer, 91 ctypes.c_int, 92 ctypes.c_char_p, 93 ctypes.c_int32 94) 95libopus_encode.restype = ctypes.c_int32 96 97 98# FIXME: Remove typing.Any once we have a stub for ctypes 99def encode( 100 encoder_state: ctypes.Structure, 101 pcm_data: bytes, 102 frame_size: int, 103 max_data_bytes: int 104) -> typing.Union[bytes, typing.Any]: 105 """ 106 Encodes an Opus Frame. 107 108 Returns string output payload. 109 110 Parameters: 111 [in] st OpusEncoder*: Encoder state 112 [in] pcm opus_int16*: Input signal (interleaved if 2 channels). length 113 is frame_size*channels*sizeof(opus_int16) 114 [in] frame_size int: Number of samples per channel in the input signal. 115 This must be an Opus frame size for the encoder's sampling rate. For 116 example, at 48 kHz the permitted values are 120, 240, 480, 960, 117 1920, and 2880. Passing in a duration of less than 10 ms 118 (480 samples at 48 kHz) will prevent the encoder from using the 119 LPC or hybrid modes. 120 [out] data unsigned char*: Output payload. This must contain storage 121 for at least max_data_bytes. 122 [in] max_data_bytes opus_int32: Size of the allocated memory for the 123 output payload. This may be used to impose an upper limit on the 124 instant bitrate, but should not be used as the only bitrate control. 125 Use OPUS_SET_BITRATE to control the bitrate. 126 """ 127 pcm_pointer = ctypes.cast(pcm_data, opuslib.api.c_int16_pointer) 128 opus_data = (ctypes.c_char * max_data_bytes)() 129 130 result = libopus_encode( 131 encoder_state, 132 pcm_pointer, 133 frame_size, 134 opus_data, 135 max_data_bytes 136 ) 137 138 if result < 0: 139 raise opuslib.OpusError( 140 'Opus Encoder returned result="{}"'.format(result)) 141 142 return array.array('b', opus_data[:result]).tobytes() 143 144 145libopus_encode_float = opuslib.api.libopus.opus_encode_float 146libopus_encode_float.argtypes = ( 147 EncoderPointer, 148 opuslib.api.c_float_pointer, 149 ctypes.c_int, 150 ctypes.c_char_p, 151 ctypes.c_int32 152) 153libopus_encode_float.restype = ctypes.c_int32 154 155 156# FIXME: Remove typing.Any once we have a stub for ctypes 157def encode_float( 158 encoder_state: ctypes.Structure, 159 pcm_data: bytes, 160 frame_size: int, 161 max_data_bytes: int 162) -> typing.Union[bytes, typing.Any]: 163 """Encodes an Opus frame from floating point input""" 164 pcm_pointer = ctypes.cast(pcm_data, opuslib.api.c_float_pointer) 165 opus_data = (ctypes.c_char * max_data_bytes)() 166 167 result = libopus_encode_float( 168 encoder_state, 169 pcm_pointer, 170 frame_size, 171 opus_data, 172 max_data_bytes 173 ) 174 175 if result < 0: 176 raise opuslib.OpusError( 177 'Encoder returned result="{}"'.format(result)) 178 179 return array.array('b', opus_data[:result]).tobytes() 180 181 182destroy = opuslib.api.libopus.opus_encoder_destroy 183destroy.argtypes = (EncoderPointer,) # must be sequence (,) of types! 184destroy.restype = None 185destroy.__doc__ = "Frees an OpusEncoder allocated by opus_encoder_create()" 186