1# -*- coding: utf-8 -*-
2# -----------------------------------------------------------------------------
3# Name:         corpus/manager.py
4# Purpose:      Manage multiple corpora
5#
6# Authors:      Christopher Ariza
7#               Josiah Wolf Oberholtzer
8#               Michael Scott Cuthbert
9#
10# Copyright:    Copyright © 2009, 2013, 2015-17 Michael Scott Cuthbert and the music21 Project
11# License:      BSD, see license.txt
12# -----------------------------------------------------------------------------
13'''
14The manager module handles requests across multiple corpora.  It should be the default
15interface to searching corpora.
16
17New in v3 -- previously most were static methods on corpus.corpora.Corpus, but that
18seemed inappropriate since these work across corpora.
19'''
20import pathlib
21import os
22
23from music21 import common
24from music21 import converter
25from music21 import environment
26from music21 import metadata
27
28from music21.corpus import corpora
29from music21.exceptions21 import CorpusException
30
31_metadataBundles = {
32    'core': None,
33    'local': None,
34    # 'virtual': None,
35}
36
37# -----------------------------------------------------------------------------
38
39
40def fromName(name):
41    '''
42    Instantiate a specific corpus based on `name`:
43
44    >>> corpus.manager.fromName('core')
45    <music21.corpus.corpora.CoreCorpus>
46
47    >>> corpus.manager.fromName('local')
48    <music21.corpus.corpora.LocalCorpus: 'local'>
49
50    >>> corpus.manager.fromName(None)
51    <music21.corpus.corpora.LocalCorpus: 'local'>
52
53
54    Note that this corpus probably does not exist on disk, but it's ready to have
55    paths added to it and to be stored on disk.
56
57    >>> corpus.manager.fromName('testDummy')
58    <music21.corpus.corpora.LocalCorpus: 'testDummy'>
59    '''
60    # >>> corpus.manager.fromName('virtual')
61    # <music21.corpus.corpora.VirtualCorpus>
62
63    if name == 'core':
64        return corpora.CoreCorpus()
65    # elif name == 'virtual':
66    #     return corpora.VirtualCorpus()
67    elif name == 'local':
68        return corpora.LocalCorpus()
69    else:
70        return corpora.LocalCorpus(name=name)
71
72
73def iterateCorpora(returnObjects=True):
74    '''
75    a generator that iterates over the corpora (either as objects or as names)
76    for use in pan corpus searching.
77
78    This test will only show the first two, because it needs to run the same
79    on every system:
80
81    >>> for i, corpusObject in enumerate(corpus.manager.iterateCorpora()):
82    ...     print(corpusObject)
83    ...     if i == 1:
84    ...        break
85    <music21.corpus.corpora.CoreCorpus>
86    <music21.corpus.corpora.LocalCorpus: 'local'>
87
88    We can also get names instead... Note that the name of the main local corpus is 'local' not
89    None
90
91    >>> for i, corpusName in enumerate(corpus.manager.iterateCorpora(returnObjects=False)):
92    ...     print(corpusName)
93    ...     if i == 1:
94    ...        break
95    core
96    local
97
98    New in v.3
99    '''
100    if returnObjects is True:
101        yield corpora.CoreCorpus()
102        # yield corpora.VirtualCorpus()
103        for cn in listLocalCorporaNames():
104            yield corpora.LocalCorpus(cn)
105    else:
106        yield corpora.CoreCorpus().name
107        # yield corpora.VirtualCorpus().name
108        for cn in listLocalCorporaNames():
109            if cn is None:
110                yield 'local'
111            else:
112                yield cn
113
114
115def getWork(workName,
116            movementNumber=None,
117            fileExtensions=None,
118            ):
119    '''
120    this parse function is called from `corpus.parse()` and does nothing differently from it.
121
122    Searches all corpora for a file that matches the name and returns it parsed.
123    '''
124    addXMLWarning = False
125    workNameJoined = str(workName)
126    mxlWorkName = workNameJoined
127
128    if workName in (None, ''):
129        raise CorpusException(
130            'a work name must be provided as an argument')
131    if not common.isListLike(fileExtensions):
132        fileExtensions = [fileExtensions]
133
134    if workNameJoined.endswith('.xml') or workNameJoined.endswith('.musicxml'):
135        # might be compressed MXL file
136        mxlWorkName = os.path.splitext(workNameJoined)[0] + '.mxl'
137        addXMLWarning = True
138
139    filePaths = None
140    for corpusObject in iterateCorpora():
141        workList = corpusObject.getWorkList(workName, movementNumber, fileExtensions)
142        if not workList and addXMLWarning:
143            workList = corpusObject.getWorkList(mxlWorkName, movementNumber, fileExtensions)
144            if not workList:
145                continue
146        if workList:
147            filePaths = workList
148            break
149
150    if filePaths is None:
151        warningMessage = 'Could not find a'
152        if addXMLWarning:
153            warningMessage += 'n xml or mxl'
154        warningMessage += f' work that met this criterion: {workName};'
155        warningMessage += ' if you are searching for a file on disk, '
156        warningMessage += 'use "converter" instead of "corpus".'
157        raise CorpusException(warningMessage)
158
159    if len(filePaths) == 1:
160        return pathlib.Path(filePaths[0])
161    else:
162        return [pathlib.Path(p) for p in filePaths]
163
164
165# pylint: disable=redefined-builtin
166# noinspection PyShadowingBuiltins
167def parse(workName,
168            movementNumber=None,
169            number=None,
170            fileExtensions=None,
171            forceSource=False,
172            format=None  # @ReservedAssignment
173          ):
174    filePath = getWork(workName=workName,
175                        movementNumber=movementNumber,
176                        fileExtensions=fileExtensions,
177                       )
178    if isinstance(filePath, list):
179        filePath = filePath[0]
180
181    streamObject = converter.parse(
182        filePath,
183        forceSource=forceSource,
184        number=number,
185        format=format
186    )
187    _addCorpusFilepathToStreamObject(streamObject, filePath)
188    return streamObject
189
190
191def _addCorpusFilepathToStreamObject(streamObj, filePath):
192    '''
193    Adds an entry 'corpusFilepath' to the Stream object.
194
195    TODO: this should work for non-core-corpora
196    TODO: this should be in the metadata object
197    TODO: this should set a pathlib.Path object
198    '''
199    # metadata attribute added to store the file path,
200    # for use later in identifying the score
201    # if streamObj.metadata == None:
202    #    streamObj.insert(metadata.Metadata())
203    corpusFilePath = str(common.getCorpusFilePath())
204    lenCFP = len(corpusFilePath) + len(os.sep)
205    filePath = str(filePath)
206
207    if filePath.startswith(corpusFilePath):
208        fp2 = filePath[lenCFP:]
209        # corpus fix for windows
210        dirsEtc = fp2.split(os.sep)
211        fp3 = '/'.join(dirsEtc)
212        streamObj.corpusFilepath = fp3
213    else:
214        streamObj.corpusFilepath = filePath
215
216
217def search(query=None, field=None, corpusNames=None, fileExtensions=None, **kwargs):
218    '''
219    Search all stored metadata bundles and return a list of file paths.
220
221    This function uses stored metadata and thus, on first usage, will incur a
222    performance penalty during metadata loading.
223
224    >>> #_DOCS_SHOW corpus.search('china')
225    >>> corpus.search('china', corpusNames=('core',))  #_DOCS_HIDE
226    <music21.metadata.bundles.MetadataBundle {1235 entries}>
227
228    >>> #_DOCS_SHOW corpus.search('china', fileExtensions='.mid')
229    >>> corpus.search('china', fileExtensions='.mid', corpusNames=('core',))  #_DOCS_HIDE
230    <music21.metadata.bundles.MetadataBundle {0 entries}>
231
232    >>> #_DOCS_SHOW corpus.search('bach', field='composer')
233    >>> corpus.search('bach', field='composer', corpusNames=('core',))  #_DOCS_HIDE
234    <music21.metadata.bundles.MetadataBundle {363 entries}>
235
236    Note the importance of good metadata -- there's almost 400 pieces by
237    Bach in the corpus, but many do not have correct metadata entries.
238
239    This can also be specified as:
240
241    >>> #_DOCS_SHOW corpus.search(composer='bach')
242    >>> corpus.search(composer='bach', corpusNames=('core',))  #_DOCS_HIDE
243    <music21.metadata.bundles.MetadataBundle {363 entries}>
244
245    Or, to get all the chorales (without using `corpus.chorales.Iterator`):
246
247    >>> #_DOCS_SHOW corpus.search(sourcePath='bach', numberOfParts=4)
248    >>> corpus.search(sourcePath='bach', numberOfParts=4, corpusNames=('core',))  #_DOCS_HIDE
249    <music21.metadata.bundles.MetadataBundle {368 entries}>
250
251
252    This function is implemented in `corpus.manager` as a method there but also directly
253    available in the corpus module for ease of use.
254
255    The ``corpusNames`` parameter can be used to specify which corpora to search,
256    for example:
257
258    >>> corpus.manager.search(
259    ...     'bach',
260    ...     corpusNames=('core',),
261    ...     )
262    <music21.metadata.bundles.MetadataBundle {564 entries}>
263
264    If ``corpusNames`` is None, all corpora known to music21 will be searched.
265
266    See usersGuide (chapter 11) for more information on searching
267
268    '''
269#     >>> corpus.search('coltrane', corpusNames=('virtual',))
270#     <music21.metadata.bundles.MetadataBundle {1 entry}>
271
272    readAllMetadataBundlesFromDisk()
273    allSearchResults = metadata.bundles.MetadataBundle()
274
275    if corpusNames is None:
276        corpusNames = list(iterateCorpora(returnObjects=False))
277
278    for corpusName in corpusNames:
279        c = fromName(corpusName)
280        searchResults = c.metadataBundle.search(
281            query, field, fileExtensions=fileExtensions, **kwargs)
282        allSearchResults = allSearchResults.union(searchResults)
283
284    return allSearchResults
285
286
287def getMetadataBundleByCorpus(corpusObject):
288    '''
289    Return the metadata bundle for a single Corpus object
290
291    >>> cc = corpus.corpora.CoreCorpus()
292    >>> mdb1 = corpus.manager.getMetadataBundleByCorpus(cc)
293    >>> mdb1
294    <music21.metadata.bundles.MetadataBundle 'core': {... entries}>
295
296    This is the same as calling `metadataBundle` on the corpus itself,
297    but this is the routine that actually does the work. In other words,
298    it's the call on the object that is redundant, not this routine.
299
300    >>> mdb1 is cc.metadataBundle
301    True
302
303    Non-existent corpus...
304
305    >>> lc = corpus.corpora.LocalCorpus('junk')
306    >>> mdb1 = corpus.manager.getMetadataBundleByCorpus(lc)
307    >>> mdb1
308    <music21.metadata.bundles.MetadataBundle 'junk': {0 entries}>
309
310    '''
311    cacheMetadataBundleFromDisk(corpusObject)
312    corpusName = corpusObject.name
313    if corpusName in _metadataBundles:
314        return _metadataBundles[corpusName]
315    else:  # pragma: no cover
316        raise CorpusException('No metadata bundle found for corpus {0} with name {1}'.format(
317            corpusObject, corpusName))
318
319
320def cacheMetadataBundleFromDisk(corpusObject):
321    r'''
322    Update a corpus' metadata bundle from its stored JSON file on disk.
323    '''
324    corpusName = corpusObject.name
325    if (corpusName not in _metadataBundles
326            or _metadataBundles[corpusName] is None):
327        metadataBundle = metadata.bundles.MetadataBundle(corpusName)
328        metadataBundle.read()
329        metadataBundle.validate()
330        # _metadataBundles needs TypedDict.
331        # noinspection PyTypeChecker
332        _metadataBundles[corpusName] = metadataBundle
333
334
335def readAllMetadataBundlesFromDisk():
336    '''
337    Read each corpus's metadata bundle and store it in memory.
338    '''
339    for corpusObject in iterateCorpora():
340        cacheMetadataBundleFromDisk(corpusObject)
341
342
343def listLocalCorporaNames(skipNone=False):
344    '''
345    List the names of all user-defined local corpora.
346
347    The entry for None refers to the default local corpus.
348    '''
349    userSettings = environment.UserSettings()
350    if not skipNone:
351        result = [None]
352    else:
353        result = []
354    result.extend(userSettings['localCorporaSettings'].keys())
355    return result
356
357
358def listSearchFields():
359    r'''
360    List all available search field names:
361
362    >>> for field in corpus.manager.listSearchFields():
363    ...     field
364    ...
365    'actNumber'
366    'alternativeTitle'
367    'ambitus'
368    'associatedWork'
369    'collectionDesignation'
370    'commission'
371    'composer'
372    'copyright'
373    ...
374    '''
375    return tuple(sorted(metadata.RichMetadata.searchAttributes))
376
377# -----------------------------------------------------------------------------
378
379
380if __name__ == '__main__':
381    import music21
382    music21.mainTest()
383