1#!/usr/local/bin/python3.8
2
3
4__license__   = 'GPL v3'
5__copyright__ = '2008, Kovid Goyal <kovid at kovidgoyal.net>'
6
7'''Read meta information from epub files'''
8
9
10import io
11import os
12import posixpath
13from contextlib import closing
14
15from calibre import CurrentDir
16from calibre.ebooks.metadata.opf import (
17    get_metadata as get_metadata_from_opf, set_metadata as set_metadata_opf
18)
19from calibre.ebooks.metadata.opf2 import OPF
20from calibre.utils.xml_parse import safe_xml_fromstring
21from calibre.ptempfile import TemporaryDirectory
22from calibre.utils.localunzip import LocalZipFile
23from calibre.utils.zipfile import BadZipfile, ZipFile, safe_replace
24
25
26class EPubException(Exception):
27    pass
28
29
30class OCFException(EPubException):
31    pass
32
33
34class ContainerException(OCFException):
35    pass
36
37
38class Container(dict):
39
40    def __init__(self, stream=None):
41        if not stream:
42            return
43        container = safe_xml_fromstring(stream.read())
44        if container.get('version', None) != '1.0':
45            raise EPubException("unsupported version of OCF")
46        rootfiles = container.xpath('./*[local-name()="rootfiles"]')
47        if not rootfiles:
48            raise EPubException("<rootfiles/> element missing")
49        for rootfile in rootfiles[0].xpath('./*[local-name()="rootfile"]'):
50            mt, fp = rootfile.get('media-type'), rootfile.get('full-path')
51            if not mt or not fp:
52                raise EPubException("<rootfile/> element malformed")
53            self[mt] = fp
54
55
56class OCF:
57    MIMETYPE        = 'application/epub+zip'
58    CONTAINER_PATH  = 'META-INF/container.xml'
59    ENCRYPTION_PATH = 'META-INF/encryption.xml'
60
61    def __init__(self):
62        raise NotImplementedError('Abstract base class')
63
64
65class Encryption:
66
67    OBFUSCATION_ALGORITHMS = frozenset(['http://ns.adobe.com/pdf/enc#RC',
68            'http://www.idpf.org/2008/embedding'])
69
70    def __init__(self, raw):
71        self.root = safe_xml_fromstring(raw) if raw else None
72        self.entries = {}
73        if self.root is not None:
74            for em in self.root.xpath('descendant::*[contains(name(), "EncryptionMethod")]'):
75                algorithm = em.get('Algorithm', '')
76                cr = em.getparent().xpath('descendant::*[contains(name(), "CipherReference")]')
77                if cr:
78                    uri = cr[0].get('URI', '')
79                    if uri and algorithm:
80                        self.entries[uri] = algorithm
81
82    def is_encrypted(self, uri):
83        algo = self.entries.get(uri, None)
84        return algo is not None and algo not in self.OBFUSCATION_ALGORITHMS
85
86
87class OCFReader(OCF):
88
89    def __init__(self):
90        try:
91            mimetype = self.read_bytes('mimetype').decode('utf-8').rstrip()
92            if mimetype != OCF.MIMETYPE:
93                print('WARNING: Invalid mimetype declaration', mimetype)
94        except:
95            print('WARNING: Epub doesn\'t contain a valid mimetype declaration')
96
97        try:
98            with closing(self.open(OCF.CONTAINER_PATH)) as f:
99                self.container = Container(f)
100        except KeyError:
101            raise EPubException("missing OCF container.xml file")
102        self.opf_path = self.container[OPF.MIMETYPE]
103        if not self.opf_path:
104            raise EPubException("missing OPF package file entry in container")
105        self._opf_cached = self._encryption_meta_cached = None
106
107    @property
108    def opf(self):
109        if self._opf_cached is None:
110            try:
111                with closing(self.open(self.opf_path)) as f:
112                    self._opf_cached = OPF(f, self.root, populate_spine=False)
113            except KeyError:
114                raise EPubException("missing OPF package file")
115        return self._opf_cached
116
117    @property
118    def encryption_meta(self):
119        if self._encryption_meta_cached is None:
120            try:
121                self._encryption_meta_cached = Encryption(self.read_bytes(self.ENCRYPTION_PATH))
122            except Exception:
123                self._encryption_meta_cached = Encryption(None)
124        return self._encryption_meta_cached
125
126    def read_bytes(self, name):
127        return self.open(name).read()
128
129
130class OCFZipReader(OCFReader):
131
132    def __init__(self, stream, mode='r', root=None):
133        if isinstance(stream, (LocalZipFile, ZipFile)):
134            self.archive = stream
135        else:
136            try:
137                self.archive = ZipFile(stream, mode=mode)
138            except BadZipfile:
139                raise EPubException("not a ZIP .epub OCF container")
140        self.root = root
141        if self.root is None:
142            name = getattr(stream, 'name', False)
143            if name:
144                self.root = os.path.abspath(os.path.dirname(name))
145            else:
146                self.root = os.getcwd()
147        super().__init__()
148
149    def open(self, name):
150        if isinstance(self.archive, LocalZipFile):
151            return self.archive.open(name)
152        return io.BytesIO(self.archive.read(name))
153
154    def read_bytes(self, name):
155        return self.archive.read(name)
156
157
158def get_zip_reader(stream, root=None):
159    try:
160        zf = ZipFile(stream, mode='r')
161    except Exception:
162        stream.seek(0)
163        zf = LocalZipFile(stream)
164    return OCFZipReader(zf, root=root)
165
166
167class OCFDirReader(OCFReader):
168
169    def __init__(self, path):
170        self.root = path
171        super().__init__()
172
173    def open(self, path):
174        return lopen(os.path.join(self.root, path), 'rb')
175
176    def read_bytes(self, path):
177        with self.open(path) as f:
178            return f.read()
179
180
181def render_cover(cpage, zf, reader=None):
182    from calibre.ebooks import render_html_svg_workaround
183    from calibre.utils.logging import default_log
184
185    if not cpage:
186        return
187    if reader is not None and reader.encryption_meta.is_encrypted(cpage):
188        return
189
190    with TemporaryDirectory('_epub_meta') as tdir:
191        with CurrentDir(tdir):
192            zf.extractall()
193            cpage = os.path.join(tdir, cpage)
194            if not os.path.exists(cpage):
195                return
196            return render_html_svg_workaround(cpage, default_log)
197
198
199def get_cover(raster_cover, first_spine_item, reader):
200    zf = reader.archive
201
202    if raster_cover:
203        if reader.encryption_meta.is_encrypted(raster_cover):
204            return
205        try:
206            return reader.read_bytes(raster_cover)
207        except Exception:
208            pass
209
210    return render_cover(first_spine_item, zf, reader=reader)
211
212
213def get_metadata(stream, extract_cover=True):
214    """ Return metadata as a :class:`Metadata` object """
215    stream.seek(0)
216    reader = get_zip_reader(stream)
217    opfbytes = reader.read_bytes(reader.opf_path)
218    mi, ver, raster_cover, first_spine_item = get_metadata_from_opf(opfbytes)
219    if extract_cover:
220        base = posixpath.dirname(reader.opf_path)
221        if raster_cover:
222            raster_cover = posixpath.normpath(posixpath.join(base, raster_cover))
223        if first_spine_item:
224            first_spine_item = posixpath.normpath(posixpath.join(base, first_spine_item))
225        try:
226            cdata = get_cover(raster_cover, first_spine_item, reader)
227            if cdata is not None:
228                mi.cover_data = ('jpg', cdata)
229        except Exception:
230            import traceback
231            traceback.print_exc()
232    mi.timestamp = None
233    return mi
234
235
236def get_quick_metadata(stream):
237    return get_metadata(stream, False)
238
239
240def serialize_cover_data(new_cdata, cpath):
241    from calibre.utils.img import save_cover_data_to
242    return save_cover_data_to(new_cdata, data_fmt=os.path.splitext(cpath)[1][1:])
243
244
245def set_metadata(stream, mi, apply_null=False, update_timestamp=False, force_identifiers=False, add_missing_cover=True):
246    stream.seek(0)
247    reader = get_zip_reader(stream, root=os.getcwd())
248    new_cdata = None
249    try:
250        new_cdata = mi.cover_data[1]
251        if not new_cdata:
252            raise Exception('no cover')
253    except Exception:
254        try:
255            with lopen(mi.cover, 'rb') as f:
256                new_cdata = f.read()
257        except Exception:
258            pass
259
260    opfbytes, ver, raster_cover = set_metadata_opf(
261        reader.read_bytes(reader.opf_path), mi, cover_prefix=posixpath.dirname(reader.opf_path),
262        cover_data=new_cdata, apply_null=apply_null, update_timestamp=update_timestamp,
263        force_identifiers=force_identifiers, add_missing_cover=add_missing_cover)
264    cpath = None
265    replacements = {}
266    if new_cdata and raster_cover:
267        try:
268            cpath = posixpath.join(posixpath.dirname(reader.opf_path),
269                    raster_cover)
270            cover_replacable = not reader.encryption_meta.is_encrypted(cpath) and \
271                    os.path.splitext(cpath)[1].lower() in ('.png', '.jpg', '.jpeg')
272            if cover_replacable:
273                replacements[cpath] = serialize_cover_data(new_cdata, cpath)
274        except Exception:
275            import traceback
276            traceback.print_exc()
277
278    if isinstance(reader.archive, LocalZipFile):
279        reader.archive.safe_replace(reader.container[OPF.MIMETYPE], opfbytes,
280            extra_replacements=replacements, add_missing=True)
281    else:
282        safe_replace(stream, reader.container[OPF.MIMETYPE], opfbytes,
283            extra_replacements=replacements, add_missing=True)
284    try:
285        if cpath is not None:
286            replacements[cpath].close()
287            os.remove(replacements[cpath].name)
288    except Exception:
289        pass
290