1#!/usr/local/bin/python3.8 2 3 4__license__ = 'GPL v3' 5__copyright__ = '2008, Kovid Goyal <kovid at kovidgoyal.net>' 6 7'''Read meta information from epub files''' 8 9 10import io 11import os 12import posixpath 13from contextlib import closing 14 15from calibre import CurrentDir 16from calibre.ebooks.metadata.opf import ( 17 get_metadata as get_metadata_from_opf, set_metadata as set_metadata_opf 18) 19from calibre.ebooks.metadata.opf2 import OPF 20from calibre.utils.xml_parse import safe_xml_fromstring 21from calibre.ptempfile import TemporaryDirectory 22from calibre.utils.localunzip import LocalZipFile 23from calibre.utils.zipfile import BadZipfile, ZipFile, safe_replace 24 25 26class EPubException(Exception): 27 pass 28 29 30class OCFException(EPubException): 31 pass 32 33 34class ContainerException(OCFException): 35 pass 36 37 38class Container(dict): 39 40 def __init__(self, stream=None): 41 if not stream: 42 return 43 container = safe_xml_fromstring(stream.read()) 44 if container.get('version', None) != '1.0': 45 raise EPubException("unsupported version of OCF") 46 rootfiles = container.xpath('./*[local-name()="rootfiles"]') 47 if not rootfiles: 48 raise EPubException("<rootfiles/> element missing") 49 for rootfile in rootfiles[0].xpath('./*[local-name()="rootfile"]'): 50 mt, fp = rootfile.get('media-type'), rootfile.get('full-path') 51 if not mt or not fp: 52 raise EPubException("<rootfile/> element malformed") 53 self[mt] = fp 54 55 56class OCF: 57 MIMETYPE = 'application/epub+zip' 58 CONTAINER_PATH = 'META-INF/container.xml' 59 ENCRYPTION_PATH = 'META-INF/encryption.xml' 60 61 def __init__(self): 62 raise NotImplementedError('Abstract base class') 63 64 65class Encryption: 66 67 OBFUSCATION_ALGORITHMS = frozenset(['http://ns.adobe.com/pdf/enc#RC', 68 'http://www.idpf.org/2008/embedding']) 69 70 def __init__(self, raw): 71 self.root = safe_xml_fromstring(raw) if raw else None 72 self.entries = {} 73 if self.root is not None: 74 for em in self.root.xpath('descendant::*[contains(name(), "EncryptionMethod")]'): 75 algorithm = em.get('Algorithm', '') 76 cr = em.getparent().xpath('descendant::*[contains(name(), "CipherReference")]') 77 if cr: 78 uri = cr[0].get('URI', '') 79 if uri and algorithm: 80 self.entries[uri] = algorithm 81 82 def is_encrypted(self, uri): 83 algo = self.entries.get(uri, None) 84 return algo is not None and algo not in self.OBFUSCATION_ALGORITHMS 85 86 87class OCFReader(OCF): 88 89 def __init__(self): 90 try: 91 mimetype = self.read_bytes('mimetype').decode('utf-8').rstrip() 92 if mimetype != OCF.MIMETYPE: 93 print('WARNING: Invalid mimetype declaration', mimetype) 94 except: 95 print('WARNING: Epub doesn\'t contain a valid mimetype declaration') 96 97 try: 98 with closing(self.open(OCF.CONTAINER_PATH)) as f: 99 self.container = Container(f) 100 except KeyError: 101 raise EPubException("missing OCF container.xml file") 102 self.opf_path = self.container[OPF.MIMETYPE] 103 if not self.opf_path: 104 raise EPubException("missing OPF package file entry in container") 105 self._opf_cached = self._encryption_meta_cached = None 106 107 @property 108 def opf(self): 109 if self._opf_cached is None: 110 try: 111 with closing(self.open(self.opf_path)) as f: 112 self._opf_cached = OPF(f, self.root, populate_spine=False) 113 except KeyError: 114 raise EPubException("missing OPF package file") 115 return self._opf_cached 116 117 @property 118 def encryption_meta(self): 119 if self._encryption_meta_cached is None: 120 try: 121 self._encryption_meta_cached = Encryption(self.read_bytes(self.ENCRYPTION_PATH)) 122 except Exception: 123 self._encryption_meta_cached = Encryption(None) 124 return self._encryption_meta_cached 125 126 def read_bytes(self, name): 127 return self.open(name).read() 128 129 130class OCFZipReader(OCFReader): 131 132 def __init__(self, stream, mode='r', root=None): 133 if isinstance(stream, (LocalZipFile, ZipFile)): 134 self.archive = stream 135 else: 136 try: 137 self.archive = ZipFile(stream, mode=mode) 138 except BadZipfile: 139 raise EPubException("not a ZIP .epub OCF container") 140 self.root = root 141 if self.root is None: 142 name = getattr(stream, 'name', False) 143 if name: 144 self.root = os.path.abspath(os.path.dirname(name)) 145 else: 146 self.root = os.getcwd() 147 super().__init__() 148 149 def open(self, name): 150 if isinstance(self.archive, LocalZipFile): 151 return self.archive.open(name) 152 return io.BytesIO(self.archive.read(name)) 153 154 def read_bytes(self, name): 155 return self.archive.read(name) 156 157 158def get_zip_reader(stream, root=None): 159 try: 160 zf = ZipFile(stream, mode='r') 161 except Exception: 162 stream.seek(0) 163 zf = LocalZipFile(stream) 164 return OCFZipReader(zf, root=root) 165 166 167class OCFDirReader(OCFReader): 168 169 def __init__(self, path): 170 self.root = path 171 super().__init__() 172 173 def open(self, path): 174 return lopen(os.path.join(self.root, path), 'rb') 175 176 def read_bytes(self, path): 177 with self.open(path) as f: 178 return f.read() 179 180 181def render_cover(cpage, zf, reader=None): 182 from calibre.ebooks import render_html_svg_workaround 183 from calibre.utils.logging import default_log 184 185 if not cpage: 186 return 187 if reader is not None and reader.encryption_meta.is_encrypted(cpage): 188 return 189 190 with TemporaryDirectory('_epub_meta') as tdir: 191 with CurrentDir(tdir): 192 zf.extractall() 193 cpage = os.path.join(tdir, cpage) 194 if not os.path.exists(cpage): 195 return 196 return render_html_svg_workaround(cpage, default_log) 197 198 199def get_cover(raster_cover, first_spine_item, reader): 200 zf = reader.archive 201 202 if raster_cover: 203 if reader.encryption_meta.is_encrypted(raster_cover): 204 return 205 try: 206 return reader.read_bytes(raster_cover) 207 except Exception: 208 pass 209 210 return render_cover(first_spine_item, zf, reader=reader) 211 212 213def get_metadata(stream, extract_cover=True): 214 """ Return metadata as a :class:`Metadata` object """ 215 stream.seek(0) 216 reader = get_zip_reader(stream) 217 opfbytes = reader.read_bytes(reader.opf_path) 218 mi, ver, raster_cover, first_spine_item = get_metadata_from_opf(opfbytes) 219 if extract_cover: 220 base = posixpath.dirname(reader.opf_path) 221 if raster_cover: 222 raster_cover = posixpath.normpath(posixpath.join(base, raster_cover)) 223 if first_spine_item: 224 first_spine_item = posixpath.normpath(posixpath.join(base, first_spine_item)) 225 try: 226 cdata = get_cover(raster_cover, first_spine_item, reader) 227 if cdata is not None: 228 mi.cover_data = ('jpg', cdata) 229 except Exception: 230 import traceback 231 traceback.print_exc() 232 mi.timestamp = None 233 return mi 234 235 236def get_quick_metadata(stream): 237 return get_metadata(stream, False) 238 239 240def serialize_cover_data(new_cdata, cpath): 241 from calibre.utils.img import save_cover_data_to 242 return save_cover_data_to(new_cdata, data_fmt=os.path.splitext(cpath)[1][1:]) 243 244 245def set_metadata(stream, mi, apply_null=False, update_timestamp=False, force_identifiers=False, add_missing_cover=True): 246 stream.seek(0) 247 reader = get_zip_reader(stream, root=os.getcwd()) 248 new_cdata = None 249 try: 250 new_cdata = mi.cover_data[1] 251 if not new_cdata: 252 raise Exception('no cover') 253 except Exception: 254 try: 255 with lopen(mi.cover, 'rb') as f: 256 new_cdata = f.read() 257 except Exception: 258 pass 259 260 opfbytes, ver, raster_cover = set_metadata_opf( 261 reader.read_bytes(reader.opf_path), mi, cover_prefix=posixpath.dirname(reader.opf_path), 262 cover_data=new_cdata, apply_null=apply_null, update_timestamp=update_timestamp, 263 force_identifiers=force_identifiers, add_missing_cover=add_missing_cover) 264 cpath = None 265 replacements = {} 266 if new_cdata and raster_cover: 267 try: 268 cpath = posixpath.join(posixpath.dirname(reader.opf_path), 269 raster_cover) 270 cover_replacable = not reader.encryption_meta.is_encrypted(cpath) and \ 271 os.path.splitext(cpath)[1].lower() in ('.png', '.jpg', '.jpeg') 272 if cover_replacable: 273 replacements[cpath] = serialize_cover_data(new_cdata, cpath) 274 except Exception: 275 import traceback 276 traceback.print_exc() 277 278 if isinstance(reader.archive, LocalZipFile): 279 reader.archive.safe_replace(reader.container[OPF.MIMETYPE], opfbytes, 280 extra_replacements=replacements, add_missing=True) 281 else: 282 safe_replace(stream, reader.container[OPF.MIMETYPE], opfbytes, 283 extra_replacements=replacements, add_missing=True) 284 try: 285 if cpath is not None: 286 replacements[cpath].close() 287 os.remove(replacements[cpath].name) 288 except Exception: 289 pass 290