1#!/usr/bin/env python3
2#
3# Copyright (C) 2010, Nokia <ivan.frade@nokia.com>
4# Copyright (C) 2018-2019, Sam Thursfield <sam@afuera.me.uk>
5#
6# This program is free software; you can redistribute it and/or
7# modify it under the terms of the GNU General Public License
8# as published by the Free Software Foundation; either version 2
9# of the License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with this program; if not, write to the Free Software
18# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19# 02110-1301, USA.
20#
21
22from common.utils import configuration as cfg
23from common.utils.helpers import log
24import errno
25import json
26import math
27import os
28import re
29import subprocess
30import unittest as ut
31
32import gi
33gi.require_version('Gst', '1.0')
34from gi.repository import GLib, Gst
35
36
37def get_tracker_extract_jsonld_output(filename, mime_type=None):
38    """
39    Runs `tracker-extract --file` to extract metadata from a file.
40    """
41
42    tracker_extract = os.path.join(cfg.TRACKER_EXTRACT_PATH)
43    command = [tracker_extract, '--verbosity=0', '--output-format=json-ld', '--file', str(filename)]
44    if mime_type is not None:
45        command.extend(['--mime', mime_type])
46
47    # We depend on parsing the output, so verbosity MUST be 0.
48    env = os.environ.copy()
49    env['TRACKER_VERBOSITY'] = '0'
50    # Tell GStreamer not to fork to create the registry
51    env['GST_REGISTRY_FORK'] = 'no'
52
53    log('Running: %s' % ' '.join(command))
54    try:
55        p = subprocess.Popen(command, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
56    except OSError as e:
57        if e.errno == errno.ENOENT:
58            raise RuntimeError("Did not find tracker-extract binary. Is the 'extract' option disabled?")
59        else:
60            raise RuntimeError("Error running tracker-extract: %s" % (e))
61    stdout, stderr = p.communicate()
62
63    if p.returncode != 0:
64        raise RuntimeError(
65            "tracker-extract returned non-zero exit code: %s\n"
66            "Error output:\n%s\n" % (p.returncode, stderr.decode('unicode-escape').strip()))
67
68    if len(stderr) > 0:
69        error_output = stderr.decode('unicode-escape').strip()
70        log("Error output from tracker-extract:\n%s" % error_output)
71
72    try:
73        output = stdout.decode('utf-8')
74
75        if len(output.strip()) == 0:
76            raise RuntimeError("tracker-extract didn't return any data.\n"
77                               "Error output was: %s" % error_output)
78
79        data = json.loads(output)
80    except ValueError as e:
81        raise RuntimeError("tracker-extract did not return valid JSON data: %s\n"
82                           "Output was: %s" % (e, output))
83
84    return data
85
86
87class TrackerExtractTestCase(ut.TestCase):
88    def assertDictHasKey(self, d, key, msg=None):
89        if not isinstance(d, dict):
90            self.fail("Expected dict, got %s" % d)
91        if key not in d:
92            standardMsg = "Missing: %s" % (key)
93            self.fail(self._formatMessage(msg, standardMsg))
94        else:
95            return
96
97    def assertIsURN(self, supposed_uuid, msg=None):
98        import uuid
99
100        try:
101            if (supposed_uuid.startswith("<") and supposed_uuid.endswith(">")):
102                supposed_uuid = supposed_uuid[1:-1]
103
104            uuid.UUID(supposed_uuid)
105        except ValueError:
106            standardMsg = "'%s' is not a valid UUID" % (supposed_uuid)
107            self.fail(self._formatMessage(msg, standardMsg))
108
109    def assert_extract_result_matches_spec(self, spec, result, filename, spec_filename):
110        """
111        Checks tracker-extract json-ld output against the expected result.
112
113        Use get_tracker_extract_jsonld_output() to get the extractor output.
114
115        Look in test-extraction-data/*/*.expected.json for examples of the spec
116        format.
117        """
118
119        error_missing_prop = "Property '%s' hasn't been extracted from file \n'%s'\n (requested on '%s')"
120        error_wrong_value = "on property '%s' from file %s\n (requested on: '%s')"
121        error_wrong_length = "Length mismatch on property '%s' from file %s\n (requested on: '%s')"
122        error_extra_prop = "Property '%s' was explicitely banned for file \n'%s'\n (requested on '%s')"
123        error_extra_prop_v = "Property '%s' with value '%s' was explicitely banned for file \n'%s'\n (requested on %s')"
124
125        expected_pairs = []  # List of expected (key, value)
126        unexpected_pairs = []  # List of unexpected (key, value)
127        expected_keys = []  # List of expected keys (the key must be there, value doesnt matter)
128
129        for k, v in list(spec.items()):
130            if k.startswith("!"):
131                unexpected_pairs.append((k[1:], v))
132            elif k == '@type':
133                expected_keys.append('@type')
134            else:
135                expected_pairs.append((k, v))
136
137        for prop, expected_value in expected_pairs:
138            self.assertDictHasKey(result, prop,
139                                    error_missing_prop % (prop, filename, spec_filename))
140            if expected_value == "@URNUUID@":
141                self.assertIsURN(result[prop][0]['@id'],
142                                    error_wrong_value % (prop, filename, spec_filename))
143            else:
144                if isinstance(expected_value, list):
145                    if not isinstance(result[prop], list):
146                        raise AssertionError("Expected a list property for %s, but got a %s: %s" % (
147                            prop, type(result[prop]).__name__, result[prop]))
148
149                    self.assertEqual(len(expected_value), len(result[prop]),
150                                        error_wrong_length % (prop, filename, spec_filename))
151
152                    for i in range(0, len(expected_value)):
153                        if isinstance(expected_value[i], dict):
154                            self.assert_extract_result_matches_spec(expected_value[i], result[prop][i], filename, spec_filename)
155                        else:
156                            self.assertEqual(str(expected_value[i]), str(result[prop][i]),
157                                             error_wrong_value % (prop, filename, spec_filename))
158                elif isinstance(expected_value, dict):
159                    self.assert_extract_result_matches_spec(expected_value, result[prop], filename, spec_filename)
160                else:
161                    self.assertEqual(str(spec[prop]), str(result[prop]),
162                                        error_wrong_value % (prop, filename, spec_filename))
163
164        for (prop, value) in unexpected_pairs:
165            # There is no prop, or it is but not with that value
166            if (value == ""):
167                self.assertFalse(prop in result,
168                                 error_extra_prop % (prop, filename, spec_filename))
169            else:
170                if (value == "@URNUUID@"):
171                    self.assertIsURN(result[prop][0],
172                                     error_extra_prop % (prop, filename, spec_filename))
173                else:
174                    self.assertNotIn(value, result[prop],
175                                     error_extra_prop_v % (prop, value, filename, spec_filename))
176
177        for prop in expected_keys:
178            self.assertDictHasKey(result, prop,
179                                  error_missing_prop % (prop, filename, spec_filename))
180
181
182def create_test_flac(path, duration, timeout=10):
183    """
184    Create a .flac audio file for testing purposes.
185
186    FLAC audio doesn't compress test data particularly efficiently, so
187    committing an audio file more than a few seconds long to Git is not
188    practical. This function creates a .flac file containing a test tone.
189    The 'duration' parameter sets the length in seconds of the time.
190
191    The function is guaranteed to return or raise an exception within the
192    number of seconds given in the 'timeout' parameter.
193    """
194
195    Gst.init([])
196
197    num_buffers = math.ceil(duration * 44100 / 1024.0)
198
199    pipeline_src = ' ! '.join([
200        'audiotestsrc num-buffers=%s samplesperbuffer=1024' % num_buffers,
201        'capsfilter caps="audio/x-raw,rate=44100"',
202        'flacenc',
203        'filesink location=%s' % path,
204    ])
205
206    log("Running pipeline: %s" % pipeline_src)
207    pipeline = Gst.parse_launch(pipeline_src)
208    ret = pipeline.set_state(Gst.State.PLAYING)
209
210    msg = pipeline.get_bus().poll(Gst.MessageType.ERROR | Gst.MessageType.EOS,
211                                timeout * Gst.SECOND)
212    if msg and msg.type == Gst.MessageType.EOS:
213        pass
214    elif msg and msg.type == Gst.MessageType.ERROR:
215        raise RuntimeError(msg.parse_error())
216    elif msg:
217        raise RuntimeError("Got unexpected GStreamer message %s" % msg.type)
218    else:
219        raise RuntimeError("Timeout generating test audio file after %i seconds" % timeout)
220
221    pipeline.set_state(Gst.State.NULL)
222