1# greenthreads.py -- Utility module for querying an ObjectStore with gevent 2# Copyright (C) 2013 eNovance SAS <licensing@enovance.com> 3# 4# Author: Fabien Boucher <fabien.boucher@enovance.com> 5# 6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 7# General Public License as public by the Free Software Foundation; version 2.0 8# or (at your option) any later version. You can redistribute it and/or 9# modify it under the terms of either of these two licenses. 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17# You should have received a copy of the licenses; if not, see 18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 20# License, Version 2.0. 21# 22 23"""Utility module for querying an ObjectStore with gevent.""" 24 25import gevent 26from gevent import pool 27 28from dulwich.objects import ( 29 Commit, 30 Tag, 31 ) 32from dulwich.object_store import ( 33 MissingObjectFinder, 34 _collect_filetree_revs, 35 ObjectStoreIterator, 36 ) 37 38 39def _split_commits_and_tags(obj_store, lst, 40 ignore_unknown=False, pool=None): 41 """Split object id list into two list with commit SHA1s and tag SHA1s. 42 43 Same implementation as object_store._split_commits_and_tags 44 except we use gevent to parallelize object retrieval. 45 """ 46 commits = set() 47 tags = set() 48 49 def find_commit_type(sha): 50 try: 51 o = obj_store[sha] 52 except KeyError: 53 if not ignore_unknown: 54 raise 55 else: 56 if isinstance(o, Commit): 57 commits.add(sha) 58 elif isinstance(o, Tag): 59 tags.add(sha) 60 commits.add(o.object[1]) 61 else: 62 raise KeyError('Not a commit or a tag: %s' % sha) 63 jobs = [pool.spawn(find_commit_type, s) for s in lst] 64 gevent.joinall(jobs) 65 return (commits, tags) 66 67 68class GreenThreadsMissingObjectFinder(MissingObjectFinder): 69 """Find the objects missing from another object store. 70 71 Same implementation as object_store.MissingObjectFinder 72 except we use gevent to parallelize object retrieval. 73 """ 74 def __init__(self, object_store, haves, wants, 75 progress=None, get_tagged=None, 76 concurrency=1, get_parents=None): 77 78 def collect_tree_sha(sha): 79 self.sha_done.add(sha) 80 cmt = object_store[sha] 81 _collect_filetree_revs(object_store, cmt.tree, self.sha_done) 82 83 self.object_store = object_store 84 p = pool.Pool(size=concurrency) 85 86 have_commits, have_tags = \ 87 _split_commits_and_tags(object_store, haves, 88 True, p) 89 want_commits, want_tags = \ 90 _split_commits_and_tags(object_store, wants, 91 False, p) 92 all_ancestors = object_store._collect_ancestors(have_commits)[0] 93 missing_commits, common_commits = \ 94 object_store._collect_ancestors(want_commits, all_ancestors) 95 96 self.sha_done = set() 97 jobs = [p.spawn(collect_tree_sha, c) for c in common_commits] 98 gevent.joinall(jobs) 99 for t in have_tags: 100 self.sha_done.add(t) 101 missing_tags = want_tags.difference(have_tags) 102 wants = missing_commits.union(missing_tags) 103 self.objects_to_send = set([(w, None, False) for w in wants]) 104 if progress is None: 105 self.progress = lambda x: None 106 else: 107 self.progress = progress 108 self._tagged = get_tagged and get_tagged() or {} 109 110 111class GreenThreadsObjectStoreIterator(ObjectStoreIterator): 112 """ObjectIterator that works on top of an ObjectStore. 113 114 Same implementation as object_store.ObjectStoreIterator 115 except we use gevent to parallelize object retrieval. 116 """ 117 def __init__(self, store, shas, finder, concurrency=1): 118 self.finder = finder 119 self.p = pool.Pool(size=concurrency) 120 super(GreenThreadsObjectStoreIterator, self).__init__(store, shas) 121 122 def retrieve(self, args): 123 sha, path = args 124 return self.store[sha], path 125 126 def __iter__(self): 127 for sha, path in self.p.imap_unordered(self.retrieve, 128 self.itershas()): 129 yield sha, path 130 131 def __len__(self): 132 if len(self._shas) > 0: 133 return len(self._shas) 134 while len(self.finder.objects_to_send): 135 jobs = [] 136 for _ in range(0, len(self.finder.objects_to_send)): 137 jobs.append(self.p.spawn(self.finder.next)) 138 gevent.joinall(jobs) 139 for j in jobs: 140 if j.value is not None: 141 self._shas.append(j.value) 142 return len(self._shas) 143