1#!/usr/bin/env python3 2# 3# Copyright (C) 2010, Nokia <ivan.frade@nokia.com> 4# Copyright (C) 2018-2019, Sam Thursfield <sam@afuera.me.uk> 5# 6# This program is free software; you can redistribute it and/or 7# modify it under the terms of the GNU General Public License 8# as published by the Free Software Foundation; either version 2 9# of the License, or (at your option) any later version. 10# 11# This program is distributed in the hope that it will be useful, 12# but WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14# GNU General Public License for more details. 15# 16# You should have received a copy of the GNU General Public License 17# along with this program; if not, write to the Free Software 18# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 19# 02110-1301, USA. 20# 21 22from common.utils import configuration as cfg 23from common.utils.helpers import log 24import errno 25import json 26import math 27import os 28import re 29import subprocess 30import unittest as ut 31 32import gi 33gi.require_version('Gst', '1.0') 34from gi.repository import GLib, Gst 35 36 37def get_tracker_extract_jsonld_output(filename, mime_type=None): 38 """ 39 Runs `tracker-extract --file` to extract metadata from a file. 40 """ 41 42 tracker_extract = os.path.join(cfg.TRACKER_EXTRACT_PATH) 43 command = [tracker_extract, '--verbosity=0', '--output-format=json-ld', '--file', str(filename)] 44 if mime_type is not None: 45 command.extend(['--mime', mime_type]) 46 47 # We depend on parsing the output, so verbosity MUST be 0. 48 env = os.environ.copy() 49 env['TRACKER_VERBOSITY'] = '0' 50 # Tell GStreamer not to fork to create the registry 51 env['GST_REGISTRY_FORK'] = 'no' 52 53 log('Running: %s' % ' '.join(command)) 54 try: 55 p = subprocess.Popen(command, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 56 except OSError as e: 57 if e.errno == errno.ENOENT: 58 raise RuntimeError("Did not find tracker-extract binary. Is the 'extract' option disabled?") 59 else: 60 raise RuntimeError("Error running tracker-extract: %s" % (e)) 61 stdout, stderr = p.communicate() 62 63 if p.returncode != 0: 64 raise RuntimeError( 65 "tracker-extract returned non-zero exit code: %s\n" 66 "Error output:\n%s\n" % (p.returncode, stderr.decode('unicode-escape').strip())) 67 68 if len(stderr) > 0: 69 error_output = stderr.decode('unicode-escape').strip() 70 log("Error output from tracker-extract:\n%s" % error_output) 71 72 try: 73 output = stdout.decode('utf-8') 74 75 if len(output.strip()) == 0: 76 raise RuntimeError("tracker-extract didn't return any data.\n" 77 "Error output was: %s" % error_output) 78 79 data = json.loads(output) 80 except ValueError as e: 81 raise RuntimeError("tracker-extract did not return valid JSON data: %s\n" 82 "Output was: %s" % (e, output)) 83 84 return data 85 86 87class TrackerExtractTestCase(ut.TestCase): 88 def assertDictHasKey(self, d, key, msg=None): 89 if not isinstance(d, dict): 90 self.fail("Expected dict, got %s" % d) 91 if key not in d: 92 standardMsg = "Missing: %s" % (key) 93 self.fail(self._formatMessage(msg, standardMsg)) 94 else: 95 return 96 97 def assertIsURN(self, supposed_uuid, msg=None): 98 import uuid 99 100 try: 101 if (supposed_uuid.startswith("<") and supposed_uuid.endswith(">")): 102 supposed_uuid = supposed_uuid[1:-1] 103 104 uuid.UUID(supposed_uuid) 105 except ValueError: 106 standardMsg = "'%s' is not a valid UUID" % (supposed_uuid) 107 self.fail(self._formatMessage(msg, standardMsg)) 108 109 def assert_extract_result_matches_spec(self, spec, result, filename, spec_filename): 110 """ 111 Checks tracker-extract json-ld output against the expected result. 112 113 Use get_tracker_extract_jsonld_output() to get the extractor output. 114 115 Look in test-extraction-data/*/*.expected.json for examples of the spec 116 format. 117 """ 118 119 error_missing_prop = "Property '%s' hasn't been extracted from file \n'%s'\n (requested on '%s')" 120 error_wrong_value = "on property '%s' from file %s\n (requested on: '%s')" 121 error_wrong_length = "Length mismatch on property '%s' from file %s\n (requested on: '%s')" 122 error_extra_prop = "Property '%s' was explicitely banned for file \n'%s'\n (requested on '%s')" 123 error_extra_prop_v = "Property '%s' with value '%s' was explicitely banned for file \n'%s'\n (requested on %s')" 124 125 expected_pairs = [] # List of expected (key, value) 126 unexpected_pairs = [] # List of unexpected (key, value) 127 expected_keys = [] # List of expected keys (the key must be there, value doesnt matter) 128 129 for k, v in list(spec.items()): 130 if k.startswith("!"): 131 unexpected_pairs.append((k[1:], v)) 132 elif k == '@type': 133 expected_keys.append('@type') 134 else: 135 expected_pairs.append((k, v)) 136 137 for prop, expected_value in expected_pairs: 138 self.assertDictHasKey(result, prop, 139 error_missing_prop % (prop, filename, spec_filename)) 140 if expected_value == "@URNUUID@": 141 self.assertIsURN(result[prop][0]['@id'], 142 error_wrong_value % (prop, filename, spec_filename)) 143 else: 144 if isinstance(expected_value, list): 145 if not isinstance(result[prop], list): 146 raise AssertionError("Expected a list property for %s, but got a %s: %s" % ( 147 prop, type(result[prop]).__name__, result[prop])) 148 149 self.assertEqual(len(expected_value), len(result[prop]), 150 error_wrong_length % (prop, filename, spec_filename)) 151 152 for i in range(0, len(expected_value)): 153 if isinstance(expected_value[i], dict): 154 self.assert_extract_result_matches_spec(expected_value[i], result[prop][i], filename, spec_filename) 155 else: 156 self.assertEqual(str(expected_value[i]), str(result[prop][i]), 157 error_wrong_value % (prop, filename, spec_filename)) 158 elif isinstance(expected_value, dict): 159 self.assert_extract_result_matches_spec(expected_value, result[prop], filename, spec_filename) 160 else: 161 self.assertEqual(str(spec[prop]), str(result[prop]), 162 error_wrong_value % (prop, filename, spec_filename)) 163 164 for (prop, value) in unexpected_pairs: 165 # There is no prop, or it is but not with that value 166 if (value == ""): 167 self.assertFalse(prop in result, 168 error_extra_prop % (prop, filename, spec_filename)) 169 else: 170 if (value == "@URNUUID@"): 171 self.assertIsURN(result[prop][0], 172 error_extra_prop % (prop, filename, spec_filename)) 173 else: 174 self.assertNotIn(value, result[prop], 175 error_extra_prop_v % (prop, value, filename, spec_filename)) 176 177 for prop in expected_keys: 178 self.assertDictHasKey(result, prop, 179 error_missing_prop % (prop, filename, spec_filename)) 180 181 182def create_test_flac(path, duration, timeout=10): 183 """ 184 Create a .flac audio file for testing purposes. 185 186 FLAC audio doesn't compress test data particularly efficiently, so 187 committing an audio file more than a few seconds long to Git is not 188 practical. This function creates a .flac file containing a test tone. 189 The 'duration' parameter sets the length in seconds of the time. 190 191 The function is guaranteed to return or raise an exception within the 192 number of seconds given in the 'timeout' parameter. 193 """ 194 195 Gst.init([]) 196 197 num_buffers = math.ceil(duration * 44100 / 1024.0) 198 199 pipeline_src = ' ! '.join([ 200 'audiotestsrc num-buffers=%s samplesperbuffer=1024' % num_buffers, 201 'capsfilter caps="audio/x-raw,rate=44100"', 202 'flacenc', 203 'filesink location=%s' % path, 204 ]) 205 206 log("Running pipeline: %s" % pipeline_src) 207 pipeline = Gst.parse_launch(pipeline_src) 208 ret = pipeline.set_state(Gst.State.PLAYING) 209 210 msg = pipeline.get_bus().poll(Gst.MessageType.ERROR | Gst.MessageType.EOS, 211 timeout * Gst.SECOND) 212 if msg and msg.type == Gst.MessageType.EOS: 213 pass 214 elif msg and msg.type == Gst.MessageType.ERROR: 215 raise RuntimeError(msg.parse_error()) 216 elif msg: 217 raise RuntimeError("Got unexpected GStreamer message %s" % msg.type) 218 else: 219 raise RuntimeError("Timeout generating test audio file after %i seconds" % timeout) 220 221 pipeline.set_state(Gst.State.NULL) 222