1# coding: utf-8 2from __future__ import unicode_literals 3 4import base64 5import imghdr 6import os 7import subprocess 8import re 9 10try: 11 from mutagen.flac import Picture, FLAC 12 from mutagen.mp4 import MP4, MP4Cover 13 from mutagen.oggopus import OggOpus 14 from mutagen.oggvorbis import OggVorbis 15 has_mutagen = True 16except ImportError: 17 has_mutagen = False 18 19from .common import PostProcessor 20from .ffmpeg import ( 21 FFmpegPostProcessor, 22 FFmpegThumbnailsConvertorPP, 23) 24from ..utils import ( 25 check_executable, 26 encodeArgument, 27 encodeFilename, 28 error_to_compat_str, 29 Popen, 30 PostProcessingError, 31 prepend_extension, 32 shell_quote, 33) 34 35 36class EmbedThumbnailPPError(PostProcessingError): 37 pass 38 39 40class EmbedThumbnailPP(FFmpegPostProcessor): 41 42 def __init__(self, downloader=None, already_have_thumbnail=False): 43 FFmpegPostProcessor.__init__(self, downloader) 44 self._already_have_thumbnail = already_have_thumbnail 45 46 def _get_thumbnail_resolution(self, filename, thumbnail_dict): 47 def guess(): 48 width, height = thumbnail_dict.get('width'), thumbnail_dict.get('height') 49 if width and height: 50 return width, height 51 52 try: 53 size_regex = r',\s*(?P<w>\d+)x(?P<h>\d+)\s*[,\[]' 54 size_result = self.run_ffmpeg(filename, None, ['-hide_banner'], expected_retcodes=(1,)) 55 mobj = re.search(size_regex, size_result) 56 if mobj is None: 57 return guess() 58 except PostProcessingError as err: 59 self.report_warning('unable to find the thumbnail resolution; %s' % error_to_compat_str(err)) 60 return guess() 61 return int(mobj.group('w')), int(mobj.group('h')) 62 63 def _report_run(self, exe, filename): 64 self.to_screen('%s: Adding thumbnail to "%s"' % (exe, filename)) 65 66 @PostProcessor._restrict_to(images=False) 67 def run(self, info): 68 filename = info['filepath'] 69 temp_filename = prepend_extension(filename, 'temp') 70 71 if not info.get('thumbnails'): 72 self.to_screen('There aren\'t any thumbnails to embed') 73 return [], info 74 75 idx = next((-i for i, t in enumerate(info['thumbnails'][::-1], 1) if t.get('filepath')), None) 76 if idx is None: 77 self.to_screen('There are no thumbnails on disk') 78 return [], info 79 thumbnail_filename = info['thumbnails'][idx]['filepath'] 80 if not os.path.exists(encodeFilename(thumbnail_filename)): 81 self.report_warning('Skipping embedding the thumbnail because the file is missing.') 82 return [], info 83 84 # Correct extension for WebP file with wrong extension (see #25687, #25717) 85 convertor = FFmpegThumbnailsConvertorPP(self._downloader) 86 convertor.fixup_webp(info, idx) 87 88 original_thumbnail = thumbnail_filename = info['thumbnails'][idx]['filepath'] 89 90 # Convert unsupported thumbnail formats to PNG (see #25687, #25717) 91 # Original behavior was to convert to JPG, but since JPG is a lossy 92 # format, there will be some additional data loss. 93 # PNG, on the other hand, is lossless. 94 thumbnail_ext = os.path.splitext(thumbnail_filename)[1][1:] 95 if thumbnail_ext not in ('jpg', 'jpeg', 'png'): 96 thumbnail_filename = convertor.convert_thumbnail(thumbnail_filename, 'png') 97 thumbnail_ext = 'png' 98 99 mtime = os.stat(encodeFilename(filename)).st_mtime 100 101 success = True 102 if info['ext'] == 'mp3': 103 options = [ 104 '-c', 'copy', '-map', '0:0', '-map', '1:0', '-id3v2_version', '3', 105 '-metadata:s:v', 'title="Album cover"', '-metadata:s:v', 'comment="Cover (front)"'] 106 107 self._report_run('ffmpeg', filename) 108 self.run_ffmpeg_multiple_files([filename, thumbnail_filename], temp_filename, options) 109 110 elif info['ext'] in ['mkv', 'mka']: 111 options = ['-c', 'copy', '-map', '0', '-dn'] 112 113 mimetype = 'image/%s' % ('png' if thumbnail_ext == 'png' else 'jpeg') 114 old_stream, new_stream = self.get_stream_number( 115 filename, ('tags', 'mimetype'), mimetype) 116 if old_stream is not None: 117 options.extend(['-map', '-0:%d' % old_stream]) 118 new_stream -= 1 119 options.extend([ 120 '-attach', thumbnail_filename, 121 '-metadata:s:%d' % new_stream, 'mimetype=%s' % mimetype, 122 '-metadata:s:%d' % new_stream, 'filename=cover.%s' % thumbnail_ext]) 123 124 self._report_run('ffmpeg', filename) 125 self.run_ffmpeg(filename, temp_filename, options) 126 127 elif info['ext'] in ['m4a', 'mp4', 'mov']: 128 prefer_atomicparsley = 'embed-thumbnail-atomicparsley' in self.get_param('compat_opts', []) 129 # Method 1: Use mutagen 130 if not has_mutagen or prefer_atomicparsley: 131 success = False 132 else: 133 try: 134 self._report_run('mutagen', filename) 135 meta = MP4(filename) 136 # NOTE: the 'covr' atom is a non-standard MPEG-4 atom, 137 # Apple iTunes 'M4A' files include the 'moov.udta.meta.ilst' atom. 138 f = {'jpeg': MP4Cover.FORMAT_JPEG, 'png': MP4Cover.FORMAT_PNG}[imghdr.what(thumbnail_filename)] 139 with open(thumbnail_filename, 'rb') as thumbfile: 140 thumb_data = thumbfile.read() 141 meta.tags['covr'] = [MP4Cover(data=thumb_data, imageformat=f)] 142 meta.save() 143 temp_filename = filename 144 except Exception as err: 145 self.report_warning('unable to embed using mutagen; %s' % error_to_compat_str(err)) 146 success = False 147 148 # Method 2: Use AtomicParsley 149 if not success: 150 success = True 151 atomicparsley = next(( 152 x for x in ['AtomicParsley', 'atomicparsley'] 153 if check_executable(x, ['-v'])), None) 154 if atomicparsley is None: 155 self.to_screen('Neither mutagen nor AtomicParsley was found. Falling back to ffmpeg') 156 success = False 157 else: 158 if not prefer_atomicparsley: 159 self.to_screen('mutagen was not found. Falling back to AtomicParsley') 160 cmd = [encodeFilename(atomicparsley, True), 161 encodeFilename(filename, True), 162 encodeArgument('--artwork'), 163 encodeFilename(thumbnail_filename, True), 164 encodeArgument('-o'), 165 encodeFilename(temp_filename, True)] 166 cmd += [encodeArgument(o) for o in self._configuration_args('AtomicParsley')] 167 168 self._report_run('atomicparsley', filename) 169 self.write_debug('AtomicParsley command line: %s' % shell_quote(cmd)) 170 p = Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 171 stdout, stderr = p.communicate_or_kill() 172 if p.returncode != 0: 173 msg = stderr.decode('utf-8', 'replace').strip() 174 self.report_warning(f'Unable to embed thumbnails using AtomicParsley; {msg}') 175 # for formats that don't support thumbnails (like 3gp) AtomicParsley 176 # won't create to the temporary file 177 if b'No changes' in stdout: 178 self.report_warning('The file format doesn\'t support embedding a thumbnail') 179 success = False 180 181 # Method 3: Use ffmpeg+ffprobe 182 # Thumbnails attached using this method doesn't show up as cover in some cases 183 # See https://github.com/yt-dlp/yt-dlp/issues/2125, https://github.com/yt-dlp/yt-dlp/issues/411 184 if not success: 185 success = True 186 try: 187 options = ['-c', 'copy', '-map', '0', '-dn', '-map', '1'] 188 189 old_stream, new_stream = self.get_stream_number( 190 filename, ('disposition', 'attached_pic'), 1) 191 if old_stream is not None: 192 options.extend(['-map', '-0:%d' % old_stream]) 193 new_stream -= 1 194 options.extend(['-disposition:%s' % new_stream, 'attached_pic']) 195 196 self._report_run('ffmpeg', filename) 197 self.run_ffmpeg_multiple_files([filename, thumbnail_filename], temp_filename, options) 198 except PostProcessingError as err: 199 success = False 200 raise EmbedThumbnailPPError(f'Unable to embed using ffprobe & ffmpeg; {err}') 201 202 elif info['ext'] in ['ogg', 'opus', 'flac']: 203 if not has_mutagen: 204 raise EmbedThumbnailPPError('module mutagen was not found. Please install using `python -m pip install mutagen`') 205 206 self._report_run('mutagen', filename) 207 f = {'opus': OggOpus, 'flac': FLAC, 'ogg': OggVorbis}[info['ext']](filename) 208 209 pic = Picture() 210 pic.mime = 'image/%s' % imghdr.what(thumbnail_filename) 211 with open(thumbnail_filename, 'rb') as thumbfile: 212 pic.data = thumbfile.read() 213 pic.type = 3 # front cover 214 res = self._get_thumbnail_resolution(thumbnail_filename, info['thumbnails'][idx]) 215 if res is not None: 216 pic.width, pic.height = res 217 218 if info['ext'] == 'flac': 219 f.add_picture(pic) 220 else: 221 # https://wiki.xiph.org/VorbisComment#METADATA_BLOCK_PICTURE 222 f['METADATA_BLOCK_PICTURE'] = base64.b64encode(pic.write()).decode('ascii') 223 f.save() 224 temp_filename = filename 225 226 else: 227 raise EmbedThumbnailPPError('Supported filetypes for thumbnail embedding are: mp3, mkv/mka, ogg/opus/flac, m4a/mp4/mov') 228 229 if success and temp_filename != filename: 230 os.replace(temp_filename, filename) 231 232 self.try_utime(filename, mtime, mtime) 233 234 files_to_delete = [thumbnail_filename] 235 if self._already_have_thumbnail: 236 if original_thumbnail == thumbnail_filename: 237 files_to_delete = [] 238 elif original_thumbnail != thumbnail_filename: 239 files_to_delete.append(original_thumbnail) 240 return files_to_delete, info 241