1# -*- Mode: Python; coding: utf-8; indent-tabs-mode: nil; tab-width: 4 -*- 2### BEGIN LICENSE 3# Copyright (c) 2012, Peter Levi <peterlevi@peterlevi.com> 4# This program is free software: you can redistribute it and/or modify it 5# under the terms of the GNU General Public License version 3, as published 6# by the Free Software Foundation. 7# 8# This program is distributed in the hope that it will be useful, but 9# WITHOUT ANY WARRANTY; without even the implied warranties of 10# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR 11# PURPOSE. See the GNU General Public License for more details. 12# 13# You should have received a copy of the GNU General Public License along 14# with this program. If not, see <http://www.gnu.org/licenses/>. 15### END LICENSE 16import base64 17import codecs 18import datetime 19import functools 20import gettext 21import hashlib 22import json 23import logging 24import os 25import random 26import re 27import shutil 28import string 29import subprocess 30import sys 31import threading 32import time 33import urllib.parse 34from itertools import cycle 35 36import bs4 37import requests 38from PIL import Image 39 40from variety_lib import get_version 41 42# fmt: off 43import gi # isort:skip 44gi.require_version("GExiv2", "0.10") 45gi.require_version("PangoCairo", "1.0") 46gi.require_version('Gdk', '3.0') 47from gi.repository import Gdk, GdkPixbuf, GExiv2, Gio, GLib, Pango # isort:skip 48# fmt: on 49 50 51USER_AGENT = "Variety Wallpaper Changer " + get_version() 52 53random.seed() 54logger = logging.getLogger("variety") 55gettext.textdomain("variety") 56 57 58def _(text): 59 """Returns the translated form of text.""" 60 if not text or not text.strip(): 61 return text 62 return gettext.gettext(text) 63 64 65def debounce(seconds): 66 """ Decorator that will postpone a functions execution until after wait seconds 67 have elapsed since the last time it was invoked. """ 68 69 def decorator(fn): 70 def debounced(*args, **kwargs): 71 def call_it(): 72 fn(*args, **kwargs) 73 74 try: 75 debounced.t.cancel() 76 except (AttributeError): 77 pass 78 debounced.t = threading.Timer(seconds, call_it) 79 debounced.t.start() 80 81 return debounced 82 83 return decorator 84 85 86class throttle(object): 87 """ 88 Decorator that prevents a function from being called more than once every time period. Allows for a trailing call. 89 90 To create a function that cannot be called more than once a minute: 91 92 @throttle(seconds=1) 93 def my_fun(): 94 pass 95 """ 96 97 def __init__(self, seconds=0, trailing_call=False): 98 """ 99 seconds - throttle interval in seconds 100 trailing - if True, there will always be a call seconds after the last call 101 """ 102 self.seconds = seconds 103 self.trailing_call = trailing_call 104 self.time_of_last_call = 0 105 self.timer = None 106 107 def __call__(self, fn): 108 @functools.wraps(fn) 109 def wrapper(*args, **kwargs): 110 try: 111 self.timer.cancel() 112 except: 113 pass 114 115 def call_it(): 116 self.time_of_last_call = time.time() 117 return fn(*args, **kwargs) 118 119 seconds_since_last_call = time.time() - self.time_of_last_call 120 if seconds_since_last_call >= self.seconds: 121 return call_it() 122 elif self.trailing_call: 123 self.timer = threading.Timer(self.seconds - seconds_since_last_call, call_it) 124 self.timer.start() 125 126 return wrapper 127 128 129def cache(ttl_seconds=100 * 365 * 24 * 3600, debug=False): 130 """ 131 caching decorator with TTL. Keep in mind the cache is per-process. 132 TODO: There is no process for cache invalidation now. Introduce memcached and use it instead. 133 :param ttl_seconds: TTL in seconds before the cache entry expires 134 :param debug: use True to log cache hits (with DEBUG level) 135 """ 136 137 def decorate(f): 138 _cache = {} 139 140 @functools.wraps(f) 141 def decorated(*args): 142 cached = _cache.get(args) 143 if not cached or cached["timestamp"] < datetime.datetime.now() - datetime.timedelta( 144 seconds=ttl_seconds 145 ): 146 cached = {"timestamp": datetime.datetime.now(), "result": f(*args)} 147 _cache[args] = cached 148 elif debug: 149 logger.debug(lambda: "@cache hit for %s" % str(args)) 150 return cached["result"] 151 152 return decorated 153 154 return decorate 155 156 157class VarietyMetadata(GExiv2.Metadata): 158 MULTIPLES = { 159 "Iptc.Application2.Headline", 160 "Iptc.Application2.Keywords", 161 "Xmp.dc.creator", 162 "Xmp.dc.subject", 163 } 164 165 NUMBERS = { 166 "Xmp.variety.sfwRating", 167 "Xmp.xmp.Rating", 168 "Exif.Image.Rating", 169 "Exif.Image.RatingPercent", 170 } 171 172 def __init__(self, path): 173 super(VarietyMetadata, self).__init__(path=path) 174 self.register_xmp_namespace("https://launchpad.net/variety/", "variety") 175 176 def __getitem__(self, key): 177 if self.has_tag(key): 178 if key in self.MULTIPLES: 179 return self.get_tag_multiple(key) 180 elif key in self.NUMBERS: 181 return self.get_tag_long(key) 182 else: 183 return self.get_tag_string(key) 184 else: 185 raise KeyError("%s: Unknown tag" % key) 186 187 def __setitem__(self, key, value): 188 if key in self.MULTIPLES: 189 self.set_tag_multiple(key, value) 190 elif key in self.NUMBERS: 191 self.set_tag_long(key, value) 192 else: 193 self.set_tag_string(key, value) 194 195 196class ModuleProfiler: 197 # How deep in other modules' code we should profile 198 MAX_NONTARGET_DEPTH = 1 199 200 def __init__(self): 201 """ 202 Initializes the module profiler. 203 """ 204 self.target_paths = [] 205 206 # Track how far deep we are in functions outside our target packages 207 # The intent is to only log the first call to outside methods without following them further 208 self.nontarget_depths = {} 209 210 def log_class(self, cls): 211 """ 212 Adds the given class' module to the list of modules to be profiled. 213 """ 214 modulename = cls.__module__ 215 if modulename not in sys.modules: 216 logger.error( 217 "ModuleProfiler: Could not add module %r (class %s) to the list of modules to trace - " 218 "has it been imported entirely?", 219 modulename, 220 cls, 221 ) 222 return 223 224 module = sys.modules[modulename] 225 226 self.log_module(module, request=cls) 227 228 def log_module(self, module, request=None): 229 """ 230 Adds the given module to the list of modules to be profiled. 231 """ 232 self.log_path(module.__file__, request=request) 233 234 def log_path(self, path, request=None): 235 """ 236 Adds the given module path to the list of profile targets. 237 """ 238 self.target_paths.append(path) 239 240 logger.info( 241 "ModuleProfiler: added path %s to list of profile targets (request=%s)", path, request 242 ) 243 244 @functools.lru_cache(maxsize=2048) 245 def is_target_path(self, path): 246 """ 247 Returns whether the given path matches one of our modules to be profiled. 248 """ 249 for target in self.target_paths: 250 if os.path.isdir(target) and path.startswith(target + os.path.sep): 251 return True 252 elif path == target: 253 return True 254 return False 255 256 def start(self): 257 """ 258 Starts the module profiler for all future threads. 259 """ 260 threading.setprofile(self.profiler) 261 262 def stop(self): 263 """ 264 Removes the module profiler globally and from future threads. 265 """ 266 if sys.getprofile() != self.profiler: 267 logger.warning( 268 "ModuleProfiler: The currently enabled profile function was not ours - unbinding anyways" 269 ) 270 threading.setprofile(None) 271 sys.setprofile(None) 272 273 def profiler(self, frame, event, arg): 274 filename = frame.f_code.co_filename 275 276 tid = threading.get_ident() 277 278 if not self.is_target_path(filename): 279 if tid not in self.nontarget_depths: 280 # Pick up where the main thread left off 281 self.nontarget_depths[tid] = self.nontarget_depths.get( 282 threading.main_thread().ident, 1 283 ) 284 else: 285 self.nontarget_depths[tid] += 1 286 else: 287 self.nontarget_depths[tid] = 0 288 289 tname = threading.current_thread().name 290 291 if event == "call": 292 if self.nontarget_depths[tid] > self.MAX_NONTARGET_DEPTH: 293 # Don't log past our max depth for packages that we're not tracking 294 return 295 else: 296 # In order: function name, line number, filename 297 s = "[%s] -> Entering function: %s\t(line %s in %s)" % ( 298 tname, 299 frame.f_code.co_name, 300 frame.f_lineno, 301 filename, 302 ) 303 if self.nontarget_depths[tid] == self.MAX_NONTARGET_DEPTH: 304 s += ( 305 " - not tracing further because MAX_NONTARGET_DEPTH=%s" 306 % self.MAX_NONTARGET_DEPTH 307 ) 308 logger.debug(s) 309 310 elif event == "return": 311 if self.nontarget_depths[tid] > self.MAX_NONTARGET_DEPTH: 312 return 313 314 logger.debug( 315 "[%s] -> Leaving function: %s\t(line %s in %s)" 316 % (tname, frame.f_code.co_name, frame.f_lineno, filename) 317 ) 318 319 320class Util: 321 @staticmethod 322 def sanitize_filename(filename): 323 valid_chars = " ,.!-+@()_%s%s" % (string.ascii_letters, string.digits) 324 return "".join(c if c in valid_chars else "_" for c in filename) 325 326 @staticmethod 327 def get_local_name(url, ensure_image=True): 328 filename = url[url.rfind("/") + 1 :] 329 index = filename.find("?") 330 if index > 0: 331 filename = filename[:index] 332 index = filename.find("#") 333 if index > 0: 334 filename = filename[:index] 335 336 filename = urllib.parse.unquote_plus(filename) 337 338 filename = Util.sanitize_filename(filename) 339 340 if len(filename) > 200: 341 filename = filename[:190] + filename[-10:] 342 343 if ensure_image and not Util.is_image(filename): 344 filename += ".jpg" 345 346 return filename 347 348 @staticmethod 349 def split(s, seps=(",", " ")): 350 result = s.split() 351 for sep in seps: 352 result = [x.strip() for y in result for x in y.split(sep) if x.strip()] 353 return result 354 355 @staticmethod 356 def makedirs(path): 357 try: 358 if not os.path.isdir(path): 359 logger.info(lambda: "Creating folder %s" % path) 360 os.makedirs(path) 361 except OSError: 362 logger.exception(lambda: "Could not makedirs for %s" % path) 363 364 @staticmethod 365 def is_image(filename, check_contents=False): 366 if Util.is_animated_gif(filename): 367 return False 368 369 if not check_contents: 370 return filename.lower().endswith( 371 (".jpg", ".jpeg", ".gif", ".png", ".tiff", ".svg", ".bmp") 372 ) 373 else: 374 format, image_width, image_height = GdkPixbuf.Pixbuf.get_file_info(filename) 375 return bool(format) 376 377 @staticmethod 378 def is_animated_gif(filename): 379 if not filename.lower().endswith(".gif"): 380 return False 381 382 gif = Image.open(filename) 383 try: 384 gif.seek(1) 385 except EOFError: 386 return False 387 else: 388 return True 389 390 @staticmethod 391 def list_files( 392 files=(), folders=(), filter_func=(lambda f: True), max_files=10000, randomize=True 393 ): 394 count = 0 395 for filepath in files: 396 logger.debug( 397 lambda: "checking file %s against filter_func %s" % (filepath, filter_func) 398 ) 399 if filter_func(filepath) and os.access(filepath, os.R_OK): 400 count += 1 401 yield filepath 402 403 folders = list(folders) 404 if randomize: 405 random.shuffle(folders) 406 407 for folder in folders: 408 if os.path.isdir(folder): 409 try: 410 for root, subFolders, files in os.walk(folder, followlinks=True): 411 if randomize: 412 random.shuffle(files) 413 random.shuffle(subFolders) 414 for filename in files: 415 logger.debug( 416 lambda: "checking file %s against filter_func %s (root=%s)" 417 % (filename, filter_func, root) 418 ) 419 path = os.path.join(root, filename) 420 if filter_func(path): 421 count += 1 422 if count > max_files: 423 logger.info( 424 lambda: "More than %d files in the folders, stop listing" 425 % max_files 426 ) 427 return 428 yield path 429 except Exception: 430 logger.exception(lambda: "Could not walk folder " + folder) 431 432 @staticmethod 433 def start_force_exit_thread(delay): 434 def force_exit(): 435 time.sleep(delay) 436 print("Exiting takes too long. Calling os.kill.") 437 os.kill(os.getpid(), 9) 438 439 force_exit_thread = threading.Thread(target=force_exit) 440 force_exit_thread.daemon = True 441 force_exit_thread.start() 442 443 @staticmethod 444 def write_metadata(filename, info): 445 try: 446 m = VarietyMetadata(filename) 447 for k, v in sorted(info.items()): 448 if k == "author": 449 m["Xmp.variety." + k] = v 450 if not "Xmp.dc.creator" in m: 451 m["Xmp.dc.creator"] = [v] 452 if k == "headline": 453 m["Iptc.Application2.Headline"] = [v] 454 elif k == "description": 455 if v is not None: 456 m.set_comment(v) 457 else: 458 m.clear_comment() 459 elif k == "keywords": 460 if not isinstance(v, (list, tuple)): 461 v = [v] 462 m["Iptc.Application2.Keywords"] = v 463 m["Xmp.dc.subject"] = v 464 elif k == "sfwRating": 465 m["Xmp.variety." + k] = int(v) 466 elif k == "extraData": 467 m["Xmp.variety." + k] = json.dumps(v, sort_keys=True) 468 else: 469 m["Xmp.variety." + k] = v 470 m.save_file() 471 return True 472 except Exception as ex: 473 # could not write metadata inside file, use json instead 474 logger.exception( 475 lambda: "Could not write metadata directly in file, trying json metadata: " 476 + filename 477 ) 478 try: 479 with open(filename + ".metadata.json", "w", encoding="utf8") as f: 480 f.write(json.dumps(info, indent=4, ensure_ascii=False, sort_keys=True)) 481 return True 482 except Exception as e: 483 logger.exception(lambda: "Could not write metadata for file " + filename) 484 return False 485 486 @staticmethod 487 def read_metadata(filename): 488 try: 489 m = VarietyMetadata(filename) 490 491 info = {} 492 for k in [ 493 "sourceName", 494 "sourceLocation", 495 "sourceURL", 496 "sourceType", 497 "imageURL", 498 "author", 499 "authorURL", 500 "noOriginPage", 501 ]: 502 if "Xmp.variety." + k in m: 503 info[k] = m["Xmp.variety." + k] 504 505 try: 506 info["sfwRating"] = int(m["Xmp.variety.sfwRating"]) 507 except: 508 pass 509 510 try: 511 info["author"] = m["Xmp.dc.creator"][0] 512 except: 513 pass 514 515 try: 516 info["headline"] = m["Iptc.Application2.Headline"][0] 517 except: 518 pass 519 520 try: 521 info["description"] = m.get_comment() 522 except: 523 pass 524 525 try: 526 info["extraData"] = json.loads(m["Xmp.variety.extraData"]) 527 except: 528 pass 529 530 try: 531 info["keywords"] = m["Iptc.Application2.Keywords"] 532 except: 533 try: 534 info["keywords"] = m["Xmp.dc.subject"] 535 except: 536 pass 537 538 return info 539 540 except Exception as e: 541 # could not read metadata inside file, try reading json metadata instead 542 try: 543 with open(filename + ".metadata.json", encoding="utf8") as f: 544 return json.loads(f.read()) 545 546 except Exception: 547 return None 548 549 @staticmethod 550 def set_rating(filename, rating): 551 if rating is not None and (rating < -1 or rating > 5): 552 raise ValueError("Rating should be between -1 and 5, or None") 553 554 m = VarietyMetadata(filename) 555 556 if rating is None: 557 for key in ["Xmp.xmp.Rating", "Exif.Image.Rating", "Exif.Image.RatingPercent"]: 558 if key in m: 559 del m[key] # pylint: disable=unsupported-delete-operation 560 else: 561 m["Xmp.xmp.Rating"] = rating 562 m["Exif.Image.Rating"] = max(0, rating) 563 if rating >= 1: 564 m["Exif.Image.RatingPercent"] = (rating - 1) * 25 565 elif "Exif.Image.RatingPercent" in m: 566 del m["Exif.Image.RatingPercent"] # pylint: disable=unsupported-delete-operation 567 568 m.save_file() 569 570 @staticmethod 571 def get_rating(filename): 572 m = VarietyMetadata(filename) 573 rating = None 574 if "Xmp.xmp.Rating" in m: 575 rating = m["Xmp.xmp.Rating"] 576 elif "Exif.Image.Rating" in m: 577 rating = m["Exif.Image.Rating"] 578 elif "Exif.Image.RatingPercent" in m: 579 rating = m["Exif.Image.RatingPercent"] // 25 + 1 580 if rating is not None: 581 rating = max(-1, min(5, rating)) 582 return rating 583 584 @staticmethod 585 def get_size(image): 586 format, image_width, image_height = GdkPixbuf.Pixbuf.get_file_info(image) 587 if not format: 588 raise Exception("Not an image or unsupported image format") 589 else: 590 return image_width, image_height 591 592 @staticmethod 593 def find_unique_name(filename): 594 index = filename.rfind(".") 595 if index < 0: 596 index = len(filename) 597 before_extension = filename[:index] 598 extension = filename[index:] 599 i = 1 600 f = filename 601 while os.path.exists(f): 602 f = before_extension + "_" + str(i) + extension 603 i += 1 604 return f 605 606 @staticmethod 607 def request(url, data=None, stream=False, method=None, timeout=5, headers=None): 608 if url.startswith("//"): 609 url = "http:" + url 610 headers = headers or {} 611 headers = {"User-Agent": USER_AGENT, "Cache-Control": "max-age=0", **headers} 612 method = method if method else "POST" if data else "GET" 613 try: 614 r = requests.request( 615 method=method, 616 url=url, 617 data=data, 618 headers=headers, 619 stream=stream, 620 allow_redirects=True, 621 timeout=timeout, 622 ) 623 r.raise_for_status() 624 return r 625 except requests.exceptions.SSLError: 626 logger.exception("SSL Error for url %s:" % url) 627 raise 628 629 @staticmethod 630 def request_write_to(r, f): 631 for chunk in r.iter_content(1024): 632 f.write(chunk) 633 634 @staticmethod 635 def fetch(url, data=None, **request_kwargs): 636 return Util.request(url, data, **request_kwargs).text 637 638 @staticmethod 639 def fetch_bytes(url, data=None, **request_kwargs): 640 return Util.request(url, data, **request_kwargs).content 641 642 @staticmethod 643 def fetch_json(url, data=None, **request_kwargs): 644 return Util.request(url, data, **request_kwargs).json() 645 646 @staticmethod 647 def html_soup(url, data=None, **request_kwargs): 648 return bs4.BeautifulSoup(Util.fetch(url, data, **request_kwargs), "lxml") 649 650 @staticmethod 651 def xml_soup(url, data=None, **request_kwargs): 652 return bs4.BeautifulSoup(Util.fetch(url, data, **request_kwargs), "xml") 653 654 @staticmethod 655 def unxor(text, key): 656 ciphertext = base64.decodebytes(text) 657 return "".join(chr(x ^ ord(y)) for (x, y) in zip(ciphertext, cycle(key))) 658 659 @staticmethod 660 def folderpath(folder): 661 p = os.path.normpath(folder) 662 if not p.endswith("/"): 663 p += "/" 664 return p 665 666 @staticmethod 667 def compute_trimmed_offsets(image_size, screen_size): 668 """Computes what width or height of the wallpaper image will be trimmed on each side, as it is zoomed in to fill 669 the whole screen. Returns a tuple (h, v, scale_ratio) in which h or v will be zero. The other one is the pixel 670 width or height that will be trimmed on each one of the sides of the image (top and down or left and right).""" 671 iw, ih = image_size 672 screen_w, screen_h = screen_size 673 screen_ratio = float(screen_w) / screen_h 674 hoffset = voffset = 0 675 if ( 676 screen_ratio > float(iw) / ih 677 ): # image is "taller" than the screen ratio - need to offset vertically 678 scaledw = float(screen_w) 679 scaledh = ih * scaledw / iw 680 voffset = int((scaledh - float(scaledw) / screen_ratio) / 2) 681 else: # image is "wider" than the screen ratio - need to offset horizontally 682 scaledh = float(screen_h) 683 scaledw = iw * scaledh / ih 684 hoffset = int((scaledw - float(scaledh) * screen_ratio) / 2) 685 686 logger.info( 687 lambda: "Trimmed offsets debug info: w:%d, h:%d, ratio:%f, iw:%d, ih:%d, scw:%d, sch:%d, ho:%d, vo:%d" 688 % (screen_w, screen_h, screen_ratio, iw, ih, scaledw, scaledh, hoffset, voffset) 689 ) 690 return hoffset, voffset 691 692 @staticmethod 693 def get_scaled_size(image): 694 """Computes the size to which the image is scaled to fit the screen: original_size * scale_ratio = scaled_size""" 695 iw, ih = Util.get_size(image) 696 screen_w, screen_h = ( 697 Gdk.Screen.get_default().get_width(), 698 Gdk.Screen.get_default().get_height(), 699 ) 700 screen_ratio = float(screen_w) / screen_h 701 if ( 702 screen_ratio > float(iw) / ih 703 ): # image is "taller" than the screen ratio - need to offset vertically 704 return screen_w, int(round(ih * float(screen_w) / iw)) 705 else: # image is "wider" than the screen ratio - need to offset horizontally 706 return int(round(iw * float(screen_h) / ih)), screen_h 707 708 @staticmethod 709 def get_scale_to_screen_ratio(image): 710 """Computes the ratio by which the image is scaled to fit the screen: original_size * scale_ratio = scaled_size""" 711 iw, ih = Util.get_size(image) 712 screen_w, screen_h = ( 713 Gdk.Screen.get_default().get_width(), 714 Gdk.Screen.get_default().get_height(), 715 ) 716 screen_ratio = float(screen_w) / screen_h 717 if ( 718 screen_ratio > float(iw) / ih 719 ): # image is "taller" than the screen ratio - need to offset vertically 720 return int(float(screen_w) / iw) 721 else: # image is "wider" than the screen ratio - need to offset horizontally 722 return int(float(screen_h) / ih) 723 724 @staticmethod 725 def gtk_to_fcmatch_font(gtk_font_name): 726 fd = Pango.FontDescription(gtk_font_name) 727 family = fd.get_family() 728 size = gtk_font_name[gtk_font_name.rindex(" ") :].strip() 729 rest = gtk_font_name.replace(family, "").strip().replace(" ", ":") 730 return family + ":" + rest, size 731 732 @staticmethod 733 def file_in(file, folder): 734 return os.path.normpath(file).startswith(os.path.normpath(folder)) 735 736 @staticmethod 737 def same_file_paths(f1, f2): 738 return os.path.normpath(f1) == os.path.normpath(f2) 739 740 @staticmethod 741 def collapseuser(path): 742 home = os.path.expanduser("~") + "/" 743 return re.sub("^" + home, "~/", path) 744 745 @staticmethod 746 def compare_versions(v1, v2): 747 from pkg_resources import parse_version 748 749 pv1 = parse_version(v1) 750 pv2 = parse_version(v2) 751 752 if pv1 == pv2: 753 return 0 754 else: 755 return -1 if pv1 < pv2 else 1 756 757 @staticmethod 758 def md5(s): 759 if isinstance(s, str): 760 s = s.encode("utf-8") 761 return hashlib.md5(s).hexdigest() 762 763 @staticmethod 764 def md5file(file): 765 with open(file, mode="rb") as f: 766 return Util.md5(f.read()) 767 768 @staticmethod 769 def random_hash(): 770 try: 771 return codecs.encode(os.urandom(16), "hex_codec").decode("utf8") 772 except Exception: 773 return "".join(random.choice(string.hexdigits) for _ in range(32)).lower() 774 775 @staticmethod 776 def get_file_icon_name(path): 777 try: 778 f = Gio.File.new_for_path(os.path.normpath(os.path.expanduser(path))) 779 query_info = f.query_info("standard::icon", Gio.FileQueryInfoFlags.NONE, None) 780 return query_info.get_attribute_object("standard::icon").get_names()[0] 781 except Exception: 782 logger.exception(lambda: "Exception while obtaining folder icon for %s:" % path) 783 return "folder" 784 785 @staticmethod 786 def is_home_encrypted(): 787 return os.path.isdir(os.path.expanduser("~").replace("/home/", "/home/.ecryptfs/")) 788 789 @staticmethod 790 def get_xdg_pictures_folder(): 791 try: 792 pics_folder = GLib.get_user_special_dir(GLib.USER_DIRECTORY_PICTURES) 793 if not pics_folder: 794 raise Exception("Could not get path to Pictures folder. Defaulting to ~/Pictures.") 795 return pics_folder 796 except: 797 logger.exception( 798 lambda: "Could not get path to Pictures folder. Defaulting to ~/Pictures." 799 ) 800 return os.path.expanduser("~/Pictures") 801 802 @staticmethod 803 def superuser_exec(*command_args): 804 logger.warning(lambda: "Executing as superuser: %s" % command_args) 805 subprocess.check_call(["pkexec"] + list(command_args)) 806 807 @staticmethod 808 def safe_map(f, l): 809 for element in l: 810 try: 811 yield f(element) 812 except Exception: 813 continue 814 815 @staticmethod 816 def get_thumbnail_data(image, width, height): 817 pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_size(image, width, height) 818 return pixbuf.save_to_bufferv("jpeg", [], [])[1] 819 820 @staticmethod 821 def is_alive_and_image(url): 822 try: 823 r = Util.request(url, method="head") 824 return r.headers.get("content-type", "").startswith("image/") 825 except Exception: 826 return False 827 828 @staticmethod 829 def is_dead_or_not_image(url): 830 if not url: 831 return True 832 833 try: 834 host = urllib.parse.urlparse(url).netloc 835 if host.startswith("interfacelift.com"): 836 return False 837 838 if "wallbase.cc" in host or "ns223506.ovh.net" in host: 839 return True 840 except: 841 return True 842 843 try: 844 r = Util.request(url, method="head") 845 return not r.headers.get("content-type", "").startswith("image/") 846 except requests.exceptions.RequestException: 847 return True 848 except: 849 return False 850 851 # makes the Gtk thread execute the given callback. 852 @staticmethod 853 def add_mainloop_task(callback, *args): 854 def cb(args): 855 args[0](*args[1:]) 856 return False 857 858 args = [callback] + list(args) 859 Gdk.threads_add_idle(GLib.PRIORITY_DEFAULT, cb, args) 860 861 @staticmethod 862 def is_unity(): 863 return os.getenv("XDG_CURRENT_DESKTOP", "").lower() == "unity" 864 865 @staticmethod 866 def start_daemon(target): 867 daemon_thread = threading.Thread(target=target) 868 daemon_thread.daemon = True 869 daemon_thread.start() 870 return daemon_thread 871 872 @staticmethod 873 def check_variety_slideshow_present(): 874 return bool(shutil.which("variety-slideshow")) 875 876 @staticmethod 877 def convert_to_filename(url): 878 url = re.sub(r"http://", "", url) 879 url = re.sub(r"https://", "", url) 880 valid_chars = "_%s%s" % (string.ascii_letters, string.digits) 881 return "".join(c if c in valid_chars else "_" for c in url) 882 883 @staticmethod 884 def safe_unlink(filepath): 885 try: 886 os.unlink(filepath) 887 except Exception: 888 logger.exception(lambda: "Could not delete {}, ignoring".format(filepath)) 889 890 @staticmethod 891 def copy_with_replace(from_path, to_path, search_replace_map): 892 with open(from_path, "r") as file: 893 data = file.read() 894 for search, replace in search_replace_map.items(): 895 data = data.replace(search, replace) 896 with open(to_path + ".partial", "w") as file: 897 file.write(data) 898 file.flush() 899 os.rename(to_path + ".partial", to_path) 900 901 @staticmethod 902 def get_exec_path(): 903 return os.path.abspath(sys.argv[0]) 904 905 @staticmethod 906 def get_folder_size(start_path): 907 total_size = 0 908 for dirpath, dirnames, filenames in os.walk(start_path): 909 for f in filenames: 910 fp = os.path.join(dirpath, f) 911 if not os.path.islink(fp): 912 total_size += os.path.getsize(fp) 913 return total_size 914 915 @staticmethod 916 def get_screen_width(): 917 return Gdk.Screen.get_default().get_width() 918 919 920def on_gtk(f): 921 @functools.wraps(f) 922 def wrapped(*args): 923 Util.add_mainloop_task(f, *args) 924 925 return wrapped 926 927 928def safe_print(text, ascii_text=None, file=sys.stdout): 929 """ 930 Python's print throws UnicodeEncodeError if the terminal encoding is borked. This version tries print, then logging, then printing the ascii text when one is present. 931 If does not throw exceptions even if it fails. 932 :param text: Text to print, str or unicode, possibly with non-ascii symbols in it 933 :param ascii_text: optional. Original untranslated ascii version of the text when present. 934 """ 935 try: 936 print(text, file=file) 937 except: # UnicodeEncodeError can happen here if the terminal is strangely configured, but we are playing safe and catching everything 938 try: 939 logging.getLogger("variety").error( 940 "Error printing non-ascii text, terminal encoding is %s" % sys.stdout.encoding 941 ) 942 if ascii_text: 943 try: 944 print(ascii_text) 945 return 946 except: 947 pass 948 logging.getLogger("variety").warning(text) 949 except: 950 pass 951