1# -*- coding: utf-8 -*- 2# Pitivi video editor 3# Copyright (c) 2005, Edward Hervey <bilboed@bilboed.com> 4# Copyright (c) 2011, Benjamin M. Schwartz <bens@alum.mit.edu> 5# 6# This program is free software; you can redistribute it and/or 7# modify it under the terms of the GNU Lesser General Public 8# License as published by the Free Software Foundation; either 9# version 2.1 of the License, or (at your option) any later version. 10# 11# This program is distributed in the hope that it will be useful, 12# but WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14# Lesser General Public License for more details. 15# 16# You should have received a copy of the GNU Lesser General Public 17# License along with this program; if not, write to the 18# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, 19# Boston, MA 02110-1301, USA. 20""" 21Classes for extracting decoded contents of streams into Python 22 23Code derived from ui/previewer.py. 24""" 25# FIXME reimplement after GES port 26from collections import deque 27 28from gi.repository import Gst 29 30from pitivi.utils.loggable import Loggable 31# from pitivi.elements.singledecodebin import SingleDecodeBin 32# from pitivi.elements.extractionsink import ExtractionSink 33 34 35def linkDynamic(element, target): 36 37 def pad_added(unused_bin, pad, target): 38 compatpad = target.get_compatible_pad(pad) 39 if compatpad: 40 pad.link_full(compatpad, Gst.PAD_LINK_CHECK_NOTHING) 41 element.connect("pad-added", pad_added, target) 42 43 44def pipeline(graph): 45 E = iter(graph.items()) 46 V = iter(graph.keys()) 47 p = Gst.Pipeline() 48 p.add(*V) 49 for u, v in E: 50 if v: 51 try: 52 u.link(v) 53 except Gst.LinkError: 54 linkDynamic(u, v) 55 return p 56 57 58class Extractee: 59 60 """Abstract base class for receiving raw data from an L{Extractor}.""" 61 62 def receive(self, array): 63 """ 64 Receive a chunk of data from an Extractor. 65 66 @param array: The chunk of data as an array 67 @type array: any kind of numeric array 68 69 """ 70 raise NotImplementedError 71 72 def finalize(self): 73 """ 74 Inform the Extractee that receive() will not be called again. 75 76 Indicates that the extraction is complete, so the Extractee should 77 process the data it has received. 78 79 """ 80 raise NotImplementedError 81 82 83class Extractor(Loggable): 84 85 """ 86 Abstract base class for extraction of raw data from a stream. 87 88 Closely modeled on L{Previewer}. 89 90 """ 91 92 def __init__(self, factory, stream_): 93 """ 94 Create a new Extractor. 95 96 @param factory: the factory with which to decode the stream 97 @type factory: L{ObjectFactory} 98 @param stream_: the stream to decode 99 @type stream_: L{Stream} 100 """ 101 Loggable.__init__(self) 102 self.debug("Initialized with %s %s", factory, stream_) 103 104 def extract(self, extractee, start, duration): 105 """ 106 Extract the raw data corresponding to a segment of the stream. 107 108 @param extractee: the L{Extractee} that will receive the raw data 109 @type extractee: L{Extractee} 110 @param start: The point in the stream at which the segment starts 111 (nanoseconds) 112 @type start: L{long} 113 @param duration: The duration of the segment (nanoseconds) 114 @type duration: L{long} 115 116 """ 117 raise NotImplementedError 118 119 120class RandomAccessExtractor(Extractor): 121 122 """ 123 Abstract class for L{Extractor}s of random access streams. 124 125 Closely inspired by L{RandomAccessPreviewer}. 126 127 """ 128 129 def __init__(self, factory, stream_): 130 Extractor.__init__(self, factory, stream_) 131 # FIXME: 132 # why doesn't this work? 133 # bin = factory.makeBin(stream_) 134 uri = factory.uri 135 caps = stream_.caps 136 bin = SingleDecodeBin(uri=uri, caps=caps, stream=stream_) 137 138 self._pipelineInit(factory, bin) 139 140 def _pipelineInit(self, factory, bin): 141 """ 142 Create the pipeline for the preview process. 143 144 Subclasses should 145 override this method and create a pipeline, connecting to 146 callbacks to the appropriate signals, and prerolling the 147 pipeline if necessary. 148 149 """ 150 raise NotImplementedError 151 152 153class RandomAccessAudioExtractor(RandomAccessExtractor): 154 155 """ 156 L{Extractor} for random access audio streams. 157 158 Closely inspired by L{RandomAccessAudioPreviewer}. 159 160 """ 161 162 def __init__(self, factory, stream_): 163 self._queue = deque() 164 RandomAccessExtractor.__init__(self, factory, stream_) 165 self._ready = False 166 167 def _pipelineInit(self, factory, sbin): 168 self.audioSink = ExtractionSink() 169 self.audioSink.set_stopped_cb(self._finishSegment) 170 # This audiorate element ensures that the extracted raw-data 171 # timeline matches the timestamps used for seeking, even if the 172 # audio source has gaps or other timestamp abnormalities. 173 audiorate = Gst.ElementFactory.make("audiorate") 174 conv = Gst.ElementFactory.make("audioconvert") 175 q = Gst.ElementFactory.make("queue") 176 self.audioPipeline = pipeline({ 177 sbin: audiorate, 178 audiorate: conv, 179 conv: q, 180 q: self.audioSink, 181 self.audioSink: None}) 182 bus = self.audioPipeline.get_bus() 183 bus.add_signal_watch() 184 bus.connect("message::error", self._busMessageErrorCb) 185 self._donecb_id = bus.connect("message::async-done", 186 self._busMessageAsyncDoneCb) 187 188 self.audioPipeline.set_state(Gst.State.PAUSED) 189 # The audiopipeline.set_state() method does not take effect 190 # immediately, but the extraction process (and in particular 191 # self._startSegment) will not work properly until 192 # self.audioPipeline reaches the desired state (State.PAUSED). 193 # To ensure that this is the case, we wait until the ASYNC_DONE 194 # message is received before setting self._ready = True, 195 # which enables extraction to proceed. 196 197 def _busMessageErrorCb(self, unused_bus, message): 198 error, debug = message.parse_error() 199 self.error("Event bus error: %s; %s", error, debug) 200 201 return Gst.BusSyncReply.PASS 202 203 def _busMessageAsyncDoneCb(self, bus, unused_message): 204 self.debug("Pipeline is ready for seeking") 205 bus.disconnect(self._donecb_id) # Don't call me again 206 self._ready = True 207 if self._queue: # Someone called .extract() before we were ready 208 self._run() 209 210 def _startSegment(self, timestamp, duration): 211 self.debug("processing segment with timestamp=%i and duration=%i", 212 timestamp, duration) 213 res = self.audioPipeline.seek(1.0, 214 Gst.Format.TIME, 215 Gst.SeekFlags.FLUSH | Gst.SeekFlags.ACCURATE, 216 Gst.SeekType.SET, timestamp, 217 Gst.SeekType.SET, timestamp + duration) 218 if not res: 219 self.warning("seek failed %s", timestamp) 220 self.audioPipeline.set_state(Gst.State.PLAYING) 221 222 return res 223 224 def _finishSegment(self): 225 self.audioSink.extractee.finalize() 226 self.audioSink.reset() 227 self._queue.popleft() 228 # If there's more to do, keep running 229 if self._queue: 230 self._run() 231 232 def extract(self, extractee, start, duration): 233 stopped = not self._queue 234 self._queue.append((extractee, start, duration)) 235 if stopped and self._ready: 236 self._run() 237 # if self._ready is False, self._run() will be called from 238 # self._busMessageDoneCb(). 239 240 def _run(self): 241 # Control flows in a cycle: 242 # _run -> _startSegment -> busMessageSegmentDoneCb -> _finishSegment -> _run 243 # This forms a loop that extracts an entire segment (i.e. satisfies an 244 # extract request) in each cycle. The cycle 245 # runs until the queue of Extractees empties. If the cycle is not 246 # running, extract() will kick it off again. 247 extractee, start, duration = self._queue[0] 248 self.audioSink.set_extractee(extractee) 249 self._startSegment(start, duration) 250