1"""Helper utilities for QGIS python unit tests.
2
3.. note:: This program is free software; you can redistribute it and/or modify
4it under the terms of the GNU General Public License as published by
5the Free Software Foundation; either version 2 of the License, or
6(at your option) any later version.
7"""
8__author__ = 'Tim Sutton (tim@linfiniti.com)'
9__date__ = '20/01/2011'
10__copyright__ = 'Copyright 2012, The QGIS Project'
11
12import qgis  # NOQA
13
14import os
15import sys
16import platform
17import tempfile
18import re
19
20try:
21    from urllib2 import urlopen, HTTPError, URLError
22except ImportError:
23    from urllib.request import urlopen, HTTPError, URLError
24
25from qgis.PyQt.QtCore import QDir, QUrl, QUrlQuery
26
27from qgis.core import (
28    QgsCoordinateReferenceSystem,
29    QgsVectorFileWriter,
30    QgsMapSettings,
31    QgsMapRendererParallelJob,
32    QgsMapRendererSequentialJob,
33    QgsFontUtils
34)
35from qgis.testing import start_app
36import hashlib
37
38
39import webbrowser
40import subprocess
41
42GEOCRS = 4326  # constant for EPSG:GEOCRS Geographic CRS id
43
44FONTSLOADED = False
45
46
47def assertHashesForFile(theHashes, theFilename):
48    """Assert that a files has matches one of a list of expected hashes"""
49    myHash = hashForFile(theFilename)
50    myMessage = ('Unexpected hash'
51                 '\nGot: %s'
52                 '\nExpected: %s'
53                 '\nPlease check graphics %s visually '
54                 'and add to list of expected hashes '
55                 'if it is OK on this platform.'
56                 % (myHash, theHashes, theFilename))
57    assert myHash in theHashes, myMessage
58
59
60def assertHashForFile(theHash, theFilename):
61    """Assert that a files has matches its expected hash"""
62    myHash = hashForFile(theFilename)
63    myMessage = ('Unexpected hash'
64                 '\nGot: %s'
65                 '\nExpected: %s' % (myHash, theHash))
66    assert myHash == theHash, myMessage
67
68
69def hashForFile(theFilename):
70    """Return an md5 checksum for a file"""
71    myPath = theFilename
72    myData = open(myPath).read()
73    myHash = hashlib.md5()
74    myHash.update(myData)
75    myHash = myHash.hexdigest()
76    return myHash
77
78
79def unitTestDataPath(theSubdir=None):
80    """Return the absolute path to the QGIS unit test data dir.
81
82    Args:
83       * theSubdir: (Optional) Additional subdir to add to the path
84    """
85    myPath = __file__
86    tmpPath = os.path.split(os.path.dirname(myPath))
87    myPath = os.path.split(tmpPath[0])
88    if theSubdir is not None:
89        myPath = os.path.abspath(os.path.join(myPath[0],
90                                              'testdata',
91                                              theSubdir))
92    else:
93        myPath = os.path.abspath(os.path.join(myPath[0], 'testdata'))
94    return myPath
95
96
97def svgSymbolsPath():
98    return os.path.abspath(
99        os.path.join(unitTestDataPath(), '..', '..', 'images', 'svg'))
100
101
102def writeShape(theMemoryLayer, theFileName):
103    myFileName = os.path.join(str(QDir.tempPath()), theFileName)
104    print(myFileName)
105    # Explicitly giving all options, not really needed but nice for clarity
106    myOptions = []
107    myLayerOptions = []
108    mySelectedOnlyFlag = False
109    mySkipAttributesFlag = False
110    myGeoCrs = QgsCoordinateReferenceSystem('EPSG:4326')
111    myResult, myErrorMessage = QgsVectorFileWriter.writeAsVectorFormat(
112        theMemoryLayer,
113        myFileName,
114        'utf-8',
115        myGeoCrs,
116        'ESRI Shapefile',
117        mySelectedOnlyFlag,
118        myOptions,
119        myLayerOptions,
120        mySkipAttributesFlag)
121    assert myResult == QgsVectorFileWriter.NoError, 'Writing shape failed, Error {} ({})'.format(myResult, myErrorMessage)
122
123    return myFileName
124
125
126def doubleNear(a, b, tol=0.0000000001):
127    """
128    Tests whether two floats are near, within a specified tolerance
129    """
130    return abs(float(a) - float(b)) < tol
131
132
133def compareUrl(a, b):
134    url_a = QUrl(a)
135    url_b = QUrl(b)
136    query_a = QUrlQuery(url_a.query()).queryItems()
137    query_b = QUrlQuery(url_b.query()).queryItems()
138
139    url_equal = url_a.path() == url_b.path()
140    for item in query_a:
141        if item not in query_b:
142            url_equal = False
143
144    return url_equal
145
146
147def compareWkt(a, b, tol=0.000001):
148    """
149    Compares two WKT strings, ignoring allowed differences between strings
150    and allowing a tolerance for coordinates
151    """
152    # ignore case
153    a0 = a.lower()
154    b0 = b.lower()
155
156    # remove optional spaces before z/m
157    r = re.compile(r"\s+([zm])")
158    a0 = r.sub(r'\1', a0)
159    b0 = r.sub(r'\1', b0)
160
161    # spaces before brackets are optional
162    r = re.compile(r"\s*\(\s*")
163    a0 = r.sub('(', a0)
164    b0 = r.sub('(', b0)
165    # spaces after brackets are optional
166    r = re.compile(r"\s*\)\s*")
167    a0 = r.sub(')', a0)
168    b0 = r.sub(')', b0)
169
170    # compare the structure
171    r0 = re.compile(r"-?\d+(?:\.\d+)?(?:[eE]\d+)?")
172    r1 = re.compile(r"\s*,\s*")
173    a0 = r1.sub(",", r0.sub("#", a0))
174    b0 = r1.sub(",", r0.sub("#", b0))
175    if a0 != b0:
176        return False
177
178    # compare the numbers with given tolerance
179    a0 = r0.findall(a)
180    b0 = r0.findall(b)
181    if len(a0) != len(b0):
182        return False
183
184    for (a1, b1) in zip(a0, b0):
185        if not doubleNear(a1, b1, tol):
186            return False
187
188    return True
189
190
191def getTempfilePath(sufx='png'):
192    """
193    :returns: Path to empty tempfile ending in defined suffix
194    Caller should delete tempfile if not used
195    """
196    tmp = tempfile.NamedTemporaryFile(
197        suffix=".{0}".format(sufx), delete=False)
198    filepath = tmp.name
199    tmp.close()
200    return filepath
201
202
203def renderMapToImage(mapsettings, parallel=False):
204    """
205    Render current map to an image, via multi-threaded renderer
206    :param QgsMapSettings mapsettings:
207    :param bool parallel: Do parallel or sequential render job
208    :rtype: QImage
209    """
210    if parallel:
211        job = QgsMapRendererParallelJob(mapsettings)
212    else:
213        job = QgsMapRendererSequentialJob(mapsettings)
214    job.start()
215    job.waitForFinished()
216
217    return job.renderedImage()
218
219
220def mapSettingsString(ms):
221    """
222    :param QgsMapSettings mapsettings:
223    :rtype: str
224    """
225    # fullExtent() causes extra call in middle of output flow; get first
226    full_ext = ms.visibleExtent().toString()
227
228    s = 'MapSettings...\n'
229    s += '  layers(): {0}\n'.format(
230        [layer.name() for layer in ms.layers()])
231    s += '  backgroundColor(): rgba {0},{1},{2},{3}\n'.format(
232        ms.backgroundColor().red(), ms.backgroundColor().green(),
233        ms.backgroundColor().blue(), ms.backgroundColor().alpha())
234    s += '  selectionColor(): rgba {0},{1},{2},{3}\n'.format(
235        ms.selectionColor().red(), ms.selectionColor().green(),
236        ms.selectionColor().blue(), ms.selectionColor().alpha())
237    s += '  outputSize(): {0} x {1}\n'.format(
238        ms.outputSize().width(), ms.outputSize().height())
239    s += '  outputDpi(): {0}\n'.format(ms.outputDpi())
240    s += '  mapUnits(): {0}\n'.format(ms.mapUnits())
241    s += '  scale(): {0}\n'.format(ms.scale())
242    s += '  mapUnitsPerPixel(): {0}\n'.format(ms.mapUnitsPerPixel())
243    s += '  extent():\n    {0}\n'.format(
244        ms.extent().toString().replace(' : ', '\n    '))
245    s += '  visibleExtent():\n    {0}\n'.format(
246        ms.visibleExtent().toString().replace(' : ', '\n    '))
247    s += '  fullExtent():\n    {0}\n'.format(full_ext.replace(' : ', '\n    '))
248    s += '  destinationCrs(): {0}\n'.format(
249        ms.destinationCrs().authid())
250    s += '  flag.Antialiasing: {0}\n'.format(
251        ms.testFlag(QgsMapSettings.Antialiasing))
252    s += '  flag.UseAdvancedEffects: {0}\n'.format(
253        ms.testFlag(QgsMapSettings.UseAdvancedEffects))
254    s += '  flag.ForceVectorOutput: {0}\n'.format(
255        ms.testFlag(QgsMapSettings.ForceVectorOutput))
256    s += '  flag.DrawLabeling: {0}\n'.format(
257        ms.testFlag(QgsMapSettings.DrawLabeling))
258    s += '  flag.DrawEditingInfo: {0}\n'.format(
259        ms.testFlag(QgsMapSettings.DrawEditingInfo))
260    s += '  outputImageFormat(): {0}\n'.format(ms.outputImageFormat())
261    return s
262
263
264def getExecutablePath(exe):
265    """
266    :param exe: Name of executable, e.g. lighttpd
267    :returns: Path to executable
268    """
269    exe_exts = []
270    if (platform.system().lower().startswith('win') and
271            "PATHEXT" in os.environ):
272        exe_exts = os.environ["PATHEXT"].split(os.pathsep)
273
274    for path in os.environ["PATH"].split(os.pathsep):
275        exe_path = os.path.join(path, exe)
276        if os.path.exists(exe_path):
277            return exe_path
278        for ext in exe_exts:
279            if os.path.exists(exe_path + ext):
280                return exe_path
281    return ''
282
283
284def getTestFontFamily():
285    return QgsFontUtils.standardTestFontFamily()
286
287
288def getTestFont(style='Roman', size=12):
289    """Only Roman and Bold are loaded by default
290    Others available: Oblique, Bold Oblique
291    """
292    if not FONTSLOADED:
293        loadTestFonts()
294    return QgsFontUtils.getStandardTestFont(style, size)
295
296
297def loadTestFonts():
298    start_app()
299
300    global FONTSLOADED  # pylint: disable=W0603
301    if FONTSLOADED is False:
302        QgsFontUtils.loadStandardTestFonts(['Roman', 'Bold'])
303        msg = getTestFontFamily() + ' base test font styles could not be loaded'
304        res = (QgsFontUtils.fontFamilyHasStyle(getTestFontFamily(), 'Roman') and
305               QgsFontUtils.fontFamilyHasStyle(getTestFontFamily(), 'Bold'))
306        assert res, msg
307        FONTSLOADED = True
308
309
310def openInBrowserTab(url):
311    if sys.platform[:3] in ('win', 'dar'):
312        webbrowser.open_new_tab(url)
313    else:
314        # some Linux OS pause execution on webbrowser open, so background it
315        cmd = 'import webbrowser;' \
316              'webbrowser.open_new_tab("{0}")'.format(url)
317        subprocess.Popen([sys.executable, "-c", cmd],
318                         stdout=subprocess.PIPE,
319                         stderr=subprocess.STDOUT)
320
321
322def printImportant(info):
323    """
324    Prints important information to stdout and to a file which in the end
325    should be printed on test result pages.
326    :param info: A string to print
327    """
328
329    print(info)
330    with open(os.path.join(tempfile.gettempdir(), 'ctest-important.log'), 'a+') as f:
331        f.write('{}\n'.format(info))
332
333
334def waitServer(url, timeout=10):
335    r""" Wait for a server to be online and to respond
336        HTTP errors are ignored
337        \param timeout: in seconds
338        \return: True of False
339    """
340    from time import time as now
341    end = now() + timeout
342    while True:
343        try:
344            urlopen(url, timeout=1)
345            return True
346        except (HTTPError, URLError):
347            return True
348        except Exception as e:
349            if now() > end:
350                return False
351