1"""Functions for GitHub API requests.""" 2 3import getpass 4import json 5import os 6import re 7import sys 8 9import requests 10 11try: 12 import requests_cache 13except ImportError: 14 print("no cache", file=sys.stderr) 15else: 16 requests_cache.install_cache("gh_api", expire_after=3600) 17 18# Keyring stores passwords by a 'username', but we're not storing a username and 19# password 20fake_username = 'ipython_tools' 21 22class Obj(dict): 23 """Dictionary with attribute access to names.""" 24 def __getattr__(self, name): 25 try: 26 return self[name] 27 except KeyError as err: 28 raise AttributeError(name) from err 29 30 def __setattr__(self, name, val): 31 self[name] = val 32 33token = None 34def get_auth_token(): 35 global token 36 37 if token is not None: 38 return token 39 40 try: 41 with open(os.path.join(os.path.expanduser('~'), '.ghoauth')) as f: 42 token, = f 43 return token 44 except Exception: 45 pass 46 47 import keyring 48 token = keyring.get_password('github', fake_username) 49 if token is not None: 50 return token 51 52 print("Please enter your github username and password. These are not " 53 "stored, only used to get an oAuth token. You can revoke this at " 54 "any time on GitHub.") 55 user = input("Username: ") 56 pw = getpass.getpass("Password: ") 57 58 auth_request = { 59 "scopes": [ 60 "public_repo", 61 "gist" 62 ], 63 "note": "IPython tools", 64 "note_url": "https://github.com/ipython/ipython/tree/master/tools", 65 } 66 response = requests.post('https://api.github.com/authorizations', 67 auth=(user, pw), data=json.dumps(auth_request)) 68 response.raise_for_status() 69 token = json.loads(response.text)['token'] 70 keyring.set_password('github', fake_username, token) 71 return token 72 73def make_auth_header(): 74 return {'Authorization': 'token ' + get_auth_token().replace("\n","")} 75 76def post_issue_comment(project, num, body): 77 url = 'https://api.github.com/repos/{project}/issues/{num}/comments'.format(project=project, num=num) 78 payload = json.dumps({'body': body}) 79 requests.post(url, data=payload, headers=make_auth_header()) 80 81def post_gist(content, description='', filename='file', auth=False): 82 """Post some text to a Gist, and return the URL.""" 83 post_data = json.dumps({ 84 "description": description, 85 "public": True, 86 "files": { 87 filename: { 88 "content": content 89 } 90 } 91 }).encode('utf-8') 92 93 headers = make_auth_header() if auth else {} 94 response = requests.post("https://api.github.com/gists", data=post_data, headers=headers) 95 response.raise_for_status() 96 response_data = json.loads(response.text) 97 return response_data['html_url'] 98 99def get_pull_request(project, num, auth=False): 100 """get pull request info by number 101 """ 102 url = "https://api.github.com/repos/{project}/pulls/{num}".format(project=project, num=num) 103 if auth: 104 header = make_auth_header() 105 else: 106 header = None 107 print("fetching %s" % url, file=sys.stderr) 108 response = requests.get(url, headers=header) 109 response.raise_for_status() 110 return json.loads(response.text, object_hook=Obj) 111 112def get_pull_request_files(project, num, auth=False): 113 """get list of files in a pull request""" 114 url = "https://api.github.com/repos/{project}/pulls/{num}/files".format(project=project, num=num) 115 if auth: 116 header = make_auth_header() 117 else: 118 header = None 119 return get_paged_request(url, headers=header) 120 121element_pat = re.compile(r'<(.+?)>') 122rel_pat = re.compile(r'rel=[\'"](\w+)[\'"]') 123 124def get_paged_request(url, headers=None, **params): 125 """get a full list, handling APIv3's paging""" 126 results = [] 127 params.setdefault("per_page", 100) 128 while True: 129 if '?' in url: 130 params = None 131 print("fetching %s" % url, file=sys.stderr) 132 else: 133 print("fetching %s with %s" % (url, params), file=sys.stderr) 134 response = requests.get(url, headers=headers, params=params) 135 response.raise_for_status() 136 results.extend(response.json()) 137 if 'next' in response.links: 138 url = response.links['next']['url'] 139 else: 140 break 141 return results 142 143def get_pulls_list(project, auth=False, **params): 144 """get pull request list""" 145 params.setdefault("state", "closed") 146 url = "https://api.github.com/repos/{project}/pulls".format(project=project) 147 if auth: 148 headers = make_auth_header() 149 else: 150 headers = None 151 pages = get_paged_request(url, headers=headers, **params) 152 return pages 153 154def get_issues_list(project, auth=False, **params): 155 """get issues list""" 156 params.setdefault("state", "closed") 157 url = "https://api.github.com/repos/{project}/issues".format(project=project) 158 if auth: 159 headers = make_auth_header() 160 else: 161 headers = None 162 pages = get_paged_request(url, headers=headers, **params) 163 return pages 164 165def get_milestones(project, auth=False, **params): 166 params.setdefault('state', 'all') 167 url = "https://api.github.com/repos/{project}/milestones".format(project=project) 168 if auth: 169 headers = make_auth_header() 170 else: 171 headers = None 172 milestones = get_paged_request(url, headers=headers, **params) 173 return milestones 174 175def get_milestone_id(project, milestone, auth=False, **params): 176 milestones = get_milestones(project, auth=auth, **params) 177 for mstone in milestones: 178 if mstone['title'] == milestone: 179 return mstone['number'] 180 raise ValueError("milestone %s not found" % milestone) 181 182def is_pull_request(issue): 183 """Return True if the given issue is a pull request.""" 184 return bool(issue.get('pull_request', {}).get('html_url', None)) 185 186def get_authors(pr): 187 print("getting authors for #%i" % pr['number'], file=sys.stderr) 188 h = make_auth_header() 189 r = requests.get(pr['commits_url'], headers=h) 190 r.raise_for_status() 191 commits = r.json() 192 authors = [] 193 for commit in commits: 194 author = commit['commit']['author'] 195 authors.append("%s <%s>" % (author['name'], author['email'])) 196 return authors 197 198# encode_multipart_formdata is from urllib3.filepost 199# The only change is to iter_fields, to enforce S3's required key ordering 200 201def iter_fields(fields): 202 fields = fields.copy() 203 for key in [ 204 'key', 'acl', 'Filename', 'success_action_status', 205 'AWSAccessKeyId', 'Policy', 'Signature', 'Content-Type', 'file']: 206 yield key, fields.pop(key) 207 yield from fields.items() 208 209def encode_multipart_formdata(fields, boundary=None): 210 """ 211 Encode a dictionary of ``fields`` using the multipart/form-data mime format. 212 213 :param fields: 214 Dictionary of fields or list of (key, value) field tuples. The key is 215 treated as the field name, and the value as the body of the form-data 216 bytes. If the value is a tuple of two elements, then the first element 217 is treated as the filename of the form-data section. 218 219 Field names and filenames must be unicode. 220 221 :param boundary: 222 If not specified, then a random boundary will be generated using 223 :func:`mimetools.choose_boundary`. 224 """ 225 # copy requests imports in here: 226 from io import BytesIO 227 from requests.packages.urllib3.filepost import ( 228 choose_boundary, writer, b, get_content_type 229 ) 230 body = BytesIO() 231 if boundary is None: 232 boundary = choose_boundary() 233 234 for fieldname, value in iter_fields(fields): 235 body.write(b('--%s\r\n' % (boundary))) 236 237 if isinstance(value, tuple): 238 filename, data = value 239 writer(body).write('Content-Disposition: form-data; name="%s"; ' 240 'filename="%s"\r\n' % (fieldname, filename)) 241 body.write(b('Content-Type: %s\r\n\r\n' % 242 (get_content_type(filename)))) 243 else: 244 data = value 245 writer(body).write('Content-Disposition: form-data; name="%s"\r\n' 246 % (fieldname)) 247 body.write(b'Content-Type: text/plain\r\n\r\n') 248 249 if isinstance(data, int): 250 data = str(data) # Backwards compatibility 251 if isinstance(data, str): 252 writer(body).write(data) 253 else: 254 body.write(data) 255 256 body.write(b'\r\n') 257 258 body.write(b('--%s--\r\n' % (boundary))) 259 260 content_type = b('multipart/form-data; boundary=%s' % boundary) 261 262 return body.getvalue(), content_type 263 264 265def post_download(project, filename, name=None, description=""): 266 """Upload a file to the GitHub downloads area""" 267 if name is None: 268 name = os.path.basename(filename) 269 with open(filename, 'rb') as f: 270 filedata = f.read() 271 272 url = "https://api.github.com/repos/{project}/downloads".format(project=project) 273 274 payload = json.dumps(dict(name=name, size=len(filedata), 275 description=description)) 276 response = requests.post(url, data=payload, headers=make_auth_header()) 277 response.raise_for_status() 278 reply = json.loads(response.content) 279 s3_url = reply['s3_url'] 280 281 fields = dict( 282 key=reply['path'], 283 acl=reply['acl'], 284 success_action_status=201, 285 Filename=reply['name'], 286 AWSAccessKeyId=reply['accesskeyid'], 287 Policy=reply['policy'], 288 Signature=reply['signature'], 289 file=(reply['name'], filedata), 290 ) 291 fields['Content-Type'] = reply['mime_type'] 292 data, content_type = encode_multipart_formdata(fields) 293 s3r = requests.post(s3_url, data=data, headers={'Content-Type': content_type}) 294 return s3r 295