1# --------------------------------------------------------------------------------------------
2# Copyright (c) Microsoft Corporation. All rights reserved.
3# Licensed under the MIT License. See License.txt in the project root for license information.
4# --------------------------------------------------------------------------------------------
5
6from azure.cli.core.azclierror import (ValidationError, CLIInternalError, UnclassifiedUserFault)
7from knack.log import get_logger
8
9from ._constants import (GITHUB_OAUTH_CLIENT_ID, GITHUB_OAUTH_SCOPES)
10
11logger = get_logger(__name__)
12
13
14'''
15Get Github personal access token following Github oauth for command line tools
16https://docs.github.com/en/developers/apps/authorizing-oauth-apps#device-flow
17'''
18
19
20def get_github_access_token(cmd, scope_list=None):  # pylint: disable=unused-argument
21    if scope_list:
22        for scope in scope_list:
23            if scope not in GITHUB_OAUTH_SCOPES:
24                raise ValidationError("Requested github oauth scope is invalid")
25        scope_list = ' '.join(scope_list)
26
27    authorize_url = 'https://github.com/login/device/code'
28    authorize_url_data = {
29        'scope': scope_list,
30        'client_id': GITHUB_OAUTH_CLIENT_ID
31    }
32
33    import requests
34    import time
35    from urllib.parse import parse_qs
36
37    try:
38        response = requests.post(authorize_url, data=authorize_url_data)
39        parsed_response = parse_qs(response.content.decode('ascii'))
40
41        device_code = parsed_response['device_code'][0]
42        user_code = parsed_response['user_code'][0]
43        verification_uri = parsed_response['verification_uri'][0]
44        interval = int(parsed_response['interval'][0])
45        expires_in_seconds = int(parsed_response['expires_in'][0])
46        logger.warning('Please navigate to %s and enter the user code %s to activate and '
47                       'retrieve your github personal access token', verification_uri, user_code)
48
49        timeout = time.time() + expires_in_seconds
50        logger.warning("Waiting up to '%s' minutes for activation", str(expires_in_seconds // 60))
51
52        confirmation_url = 'https://github.com/login/oauth/access_token'
53        confirmation_url_data = {
54            'client_id': GITHUB_OAUTH_CLIENT_ID,
55            'device_code': device_code,
56            'grant_type': 'urn:ietf:params:oauth:grant-type:device_code'
57        }
58
59        pending = True
60        while pending:
61            time.sleep(interval)
62
63            if time.time() > timeout:
64                raise UnclassifiedUserFault('Activation did not happen in time. Please try again')
65
66            confirmation_response = requests.post(confirmation_url, data=confirmation_url_data)
67            parsed_confirmation_response = parse_qs(confirmation_response.content.decode('ascii'))
68
69            if 'error' in parsed_confirmation_response and parsed_confirmation_response['error'][0]:
70                if parsed_confirmation_response['error'][0] == 'slow_down':
71                    interval += 5  # if slow_down error is received, 5 seconds is added to minimum polling interval
72                elif parsed_confirmation_response['error'][0] != 'authorization_pending':
73                    pending = False
74
75            if 'access_token' in parsed_confirmation_response and parsed_confirmation_response['access_token'][0]:
76                return parsed_confirmation_response['access_token'][0]
77    except Exception as e:
78        raise CLIInternalError(
79            'Error: {}. Please try again, or retrieve personal access token from the Github website'.format(e))
80
81    raise UnclassifiedUserFault('Activation did not happen in time. Please try again')
82