1# -*- coding: utf-8 -*-
2# Pitivi video editor
3# Copyright (c) 2005, Edward Hervey <bilboed@bilboed.com>
4# Copyright (c) 2011, Benjamin M. Schwartz <bens@alum.mit.edu>
5#
6# This program is free software; you can redistribute it and/or
7# modify it under the terms of the GNU Lesser General Public
8# License as published by the Free Software Foundation; either
9# version 2.1 of the License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14# Lesser General Public License for more details.
15#
16# You should have received a copy of the GNU Lesser General Public
17# License along with this program; if not, write to the
18# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19# Boston, MA 02110-1301, USA.
20"""
21Classes for extracting decoded contents of streams into Python
22
23Code derived from ui/previewer.py.
24"""
25# FIXME reimplement after GES port
26from collections import deque
27
28from gi.repository import Gst
29
30from pitivi.utils.loggable import Loggable
31# from pitivi.elements.singledecodebin import SingleDecodeBin
32# from pitivi.elements.extractionsink import ExtractionSink
33
34
35def linkDynamic(element, target):
36
37    def pad_added(unused_bin, pad, target):
38        compatpad = target.get_compatible_pad(pad)
39        if compatpad:
40            pad.link_full(compatpad, Gst.PAD_LINK_CHECK_NOTHING)
41    element.connect("pad-added", pad_added, target)
42
43
44def pipeline(graph):
45    E = iter(graph.items())
46    V = iter(graph.keys())
47    p = Gst.Pipeline()
48    p.add(*V)
49    for u, v in E:
50        if v:
51            try:
52                u.link(v)
53            except Gst.LinkError:
54                linkDynamic(u, v)
55    return p
56
57
58class Extractee:
59
60    """Abstract base class for receiving raw data from an L{Extractor}."""
61
62    def receive(self, array):
63        """
64        Receive a chunk of data from an Extractor.
65
66        @param array: The chunk of data as an array
67        @type array: any kind of numeric array
68
69        """
70        raise NotImplementedError
71
72    def finalize(self):
73        """
74        Inform the Extractee that receive() will not be called again.
75
76        Indicates that the extraction is complete, so the Extractee should
77            process the data it has received.
78
79        """
80        raise NotImplementedError
81
82
83class Extractor(Loggable):
84
85    """
86    Abstract base class for extraction of raw data from a stream.
87
88    Closely modeled on L{Previewer}.
89
90    """
91
92    def __init__(self, factory, stream_):
93        """
94        Create a new Extractor.
95
96        @param factory: the factory with which to decode the stream
97        @type factory: L{ObjectFactory}
98        @param stream_: the stream to decode
99        @type stream_: L{Stream}
100        """
101        Loggable.__init__(self)
102        self.debug("Initialized with %s %s", factory, stream_)
103
104    def extract(self, extractee, start, duration):
105        """
106        Extract the raw data corresponding to a segment of the stream.
107
108        @param extractee: the L{Extractee} that will receive the raw data
109        @type extractee: L{Extractee}
110        @param start: The point in the stream at which the segment starts
111            (nanoseconds)
112        @type start: L{long}
113        @param duration: The duration of the segment (nanoseconds)
114        @type duration: L{long}
115
116        """
117        raise NotImplementedError
118
119
120class RandomAccessExtractor(Extractor):
121
122    """
123    Abstract class for L{Extractor}s of random access streams.
124
125    Closely inspired by L{RandomAccessPreviewer}.
126
127    """
128
129    def __init__(self, factory, stream_):
130        Extractor.__init__(self, factory, stream_)
131        # FIXME:
132        # why doesn't this work?
133        # bin = factory.makeBin(stream_)
134        uri = factory.uri
135        caps = stream_.caps
136        bin = SingleDecodeBin(uri=uri, caps=caps, stream=stream_)
137
138        self._pipelineInit(factory, bin)
139
140    def _pipelineInit(self, factory, bin):
141        """
142        Create the pipeline for the preview process.
143
144        Subclasses should
145        override this method and create a pipeline, connecting to
146        callbacks to the appropriate signals, and prerolling the
147        pipeline if necessary.
148
149        """
150        raise NotImplementedError
151
152
153class RandomAccessAudioExtractor(RandomAccessExtractor):
154
155    """
156    L{Extractor} for random access audio streams.
157
158    Closely inspired by L{RandomAccessAudioPreviewer}.
159
160    """
161
162    def __init__(self, factory, stream_):
163        self._queue = deque()
164        RandomAccessExtractor.__init__(self, factory, stream_)
165        self._ready = False
166
167    def _pipelineInit(self, factory, sbin):
168        self.audioSink = ExtractionSink()
169        self.audioSink.set_stopped_cb(self._finishSegment)
170        # This audiorate element ensures that the extracted raw-data
171        # timeline matches the timestamps used for seeking, even if the
172        # audio source has gaps or other timestamp abnormalities.
173        audiorate = Gst.ElementFactory.make("audiorate")
174        conv = Gst.ElementFactory.make("audioconvert")
175        q = Gst.ElementFactory.make("queue")
176        self.audioPipeline = pipeline({
177            sbin: audiorate,
178            audiorate: conv,
179            conv: q,
180            q: self.audioSink,
181            self.audioSink: None})
182        bus = self.audioPipeline.get_bus()
183        bus.add_signal_watch()
184        bus.connect("message::error", self._busMessageErrorCb)
185        self._donecb_id = bus.connect("message::async-done",
186                                      self._busMessageAsyncDoneCb)
187
188        self.audioPipeline.set_state(Gst.State.PAUSED)
189        # The audiopipeline.set_state() method does not take effect
190        # immediately, but the extraction process (and in particular
191        # self._startSegment) will not work properly until
192        # self.audioPipeline reaches the desired state (State.PAUSED).
193        # To ensure that this is the case, we wait until the ASYNC_DONE
194        # message is received before setting self._ready = True,
195        # which enables extraction to proceed.
196
197    def _busMessageErrorCb(self, unused_bus, message):
198        error, debug = message.parse_error()
199        self.error("Event bus error: %s; %s", error, debug)
200
201        return Gst.BusSyncReply.PASS
202
203    def _busMessageAsyncDoneCb(self, bus, unused_message):
204        self.debug("Pipeline is ready for seeking")
205        bus.disconnect(self._donecb_id)  # Don't call me again
206        self._ready = True
207        if self._queue:  # Someone called .extract() before we were ready
208            self._run()
209
210    def _startSegment(self, timestamp, duration):
211        self.debug("processing segment with timestamp=%i and duration=%i",
212                   timestamp, duration)
213        res = self.audioPipeline.seek(1.0,
214                                      Gst.Format.TIME,
215                                      Gst.SeekFlags.FLUSH | Gst.SeekFlags.ACCURATE,
216                                      Gst.SeekType.SET, timestamp,
217                                      Gst.SeekType.SET, timestamp + duration)
218        if not res:
219            self.warning("seek failed %s", timestamp)
220        self.audioPipeline.set_state(Gst.State.PLAYING)
221
222        return res
223
224    def _finishSegment(self):
225        self.audioSink.extractee.finalize()
226        self.audioSink.reset()
227        self._queue.popleft()
228        # If there's more to do, keep running
229        if self._queue:
230            self._run()
231
232    def extract(self, extractee, start, duration):
233        stopped = not self._queue
234        self._queue.append((extractee, start, duration))
235        if stopped and self._ready:
236            self._run()
237        # if self._ready is False, self._run() will be called from
238        # self._busMessageDoneCb().
239
240    def _run(self):
241        # Control flows in a cycle:
242        # _run -> _startSegment -> busMessageSegmentDoneCb -> _finishSegment -> _run
243        # This forms a loop that extracts an entire segment (i.e. satisfies an
244        # extract request) in each cycle. The cycle
245        # runs until the queue of Extractees empties.  If the cycle is not
246        # running, extract() will kick it off again.
247        extractee, start, duration = self._queue[0]
248        self.audioSink.set_extractee(extractee)
249        self._startSegment(start, duration)
250