1# greenthreads.py -- Utility module for querying an ObjectStore with gevent
2# Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
3#
4# Author: Fabien Boucher <fabien.boucher@enovance.com>
5#
6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
7# General Public License as public by the Free Software Foundation; version 2.0
8# or (at your option) any later version. You can redistribute it and/or
9# modify it under the terms of either of these two licenses.
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17# You should have received a copy of the licenses; if not, see
18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
20# License, Version 2.0.
21#
22
23"""Utility module for querying an ObjectStore with gevent."""
24
25import gevent
26from gevent import pool
27
28from dulwich.objects import (
29    Commit,
30    Tag,
31    )
32from dulwich.object_store import (
33    MissingObjectFinder,
34    _collect_filetree_revs,
35    ObjectStoreIterator,
36    )
37
38
39def _split_commits_and_tags(obj_store, lst,
40                            ignore_unknown=False, pool=None):
41    """Split object id list into two list with commit SHA1s and tag SHA1s.
42
43    Same implementation as object_store._split_commits_and_tags
44    except we use gevent to parallelize object retrieval.
45    """
46    commits = set()
47    tags = set()
48
49    def find_commit_type(sha):
50        try:
51            o = obj_store[sha]
52        except KeyError:
53            if not ignore_unknown:
54                raise
55        else:
56            if isinstance(o, Commit):
57                commits.add(sha)
58            elif isinstance(o, Tag):
59                tags.add(sha)
60                commits.add(o.object[1])
61            else:
62                raise KeyError('Not a commit or a tag: %s' % sha)
63    jobs = [pool.spawn(find_commit_type, s) for s in lst]
64    gevent.joinall(jobs)
65    return (commits, tags)
66
67
68class GreenThreadsMissingObjectFinder(MissingObjectFinder):
69    """Find the objects missing from another object store.
70
71    Same implementation as object_store.MissingObjectFinder
72    except we use gevent to parallelize object retrieval.
73    """
74    def __init__(self, object_store, haves, wants,
75                 progress=None, get_tagged=None,
76                 concurrency=1, get_parents=None):
77
78        def collect_tree_sha(sha):
79            self.sha_done.add(sha)
80            cmt = object_store[sha]
81            _collect_filetree_revs(object_store, cmt.tree, self.sha_done)
82
83        self.object_store = object_store
84        p = pool.Pool(size=concurrency)
85
86        have_commits, have_tags = \
87            _split_commits_and_tags(object_store, haves,
88                                    True, p)
89        want_commits, want_tags = \
90            _split_commits_and_tags(object_store, wants,
91                                    False, p)
92        all_ancestors = object_store._collect_ancestors(have_commits)[0]
93        missing_commits, common_commits = \
94            object_store._collect_ancestors(want_commits, all_ancestors)
95
96        self.sha_done = set()
97        jobs = [p.spawn(collect_tree_sha, c) for c in common_commits]
98        gevent.joinall(jobs)
99        for t in have_tags:
100            self.sha_done.add(t)
101        missing_tags = want_tags.difference(have_tags)
102        wants = missing_commits.union(missing_tags)
103        self.objects_to_send = set([(w, None, False) for w in wants])
104        if progress is None:
105            self.progress = lambda x: None
106        else:
107            self.progress = progress
108        self._tagged = get_tagged and get_tagged() or {}
109
110
111class GreenThreadsObjectStoreIterator(ObjectStoreIterator):
112    """ObjectIterator that works on top of an ObjectStore.
113
114    Same implementation as object_store.ObjectStoreIterator
115    except we use gevent to parallelize object retrieval.
116    """
117    def __init__(self, store, shas, finder, concurrency=1):
118        self.finder = finder
119        self.p = pool.Pool(size=concurrency)
120        super(GreenThreadsObjectStoreIterator, self).__init__(store, shas)
121
122    def retrieve(self, args):
123        sha, path = args
124        return self.store[sha], path
125
126    def __iter__(self):
127        for sha, path in self.p.imap_unordered(self.retrieve,
128                                               self.itershas()):
129            yield sha, path
130
131    def __len__(self):
132        if len(self._shas) > 0:
133            return len(self._shas)
134        while len(self.finder.objects_to_send):
135            jobs = []
136            for _ in range(0, len(self.finder.objects_to_send)):
137                jobs.append(self.p.spawn(self.finder.next))
138            gevent.joinall(jobs)
139            for j in jobs:
140                if j.value is not None:
141                    self._shas.append(j.value)
142        return len(self._shas)
143