1from functools import partial 2try: 3 from urllib import urlencode 4except ImportError: 5 from urllib.parse import urlencode 6 7from soundcloud.resource import wrapped_resource 8from soundcloud.request import make_request 9 10 11class Client(object): 12 """A client for interacting with Soundcloud resources.""" 13 14 use_ssl = True 15 host = 'api.soundcloud.com' 16 17 def __init__(self, **kwargs): 18 """Create a client instance with the provided options. Options should 19 be passed in as kwargs. 20 """ 21 self.use_ssl = kwargs.get('use_ssl', self.use_ssl) 22 self.host = kwargs.get('host', self.host) 23 self.scheme = self.use_ssl and 'https://' or 'http://' 24 self.options = kwargs 25 self._authorize_url = None 26 27 self.client_id = kwargs.get('client_id') 28 29 if 'access_token' in kwargs: 30 self.access_token = kwargs.get('access_token') 31 return 32 33 if 'client_id' not in kwargs: 34 raise TypeError("At least a client_id must be provided.") 35 36 if 'scope' in kwargs: 37 self.scope = kwargs.get('scope') 38 39 # decide which protocol flow to follow based on the arguments 40 # provided by the caller. 41 if self._options_for_authorization_code_flow_present(): 42 self._authorization_code_flow() 43 elif self._options_for_credentials_flow_present(): 44 self._credentials_flow() 45 elif self._options_for_token_refresh_present(): 46 self._refresh_token_flow() 47 48 def exchange_token(self, code): 49 """Given the value of the code parameter, request an access token.""" 50 url = '%s%s/oauth2/token' % (self.scheme, self.host) 51 options = { 52 'grant_type': 'authorization_code', 53 'redirect_uri': self._redirect_uri(), 54 'client_id': self.options.get('client_id'), 55 'client_secret': self.options.get('client_secret'), 56 'code': code, 57 } 58 options.update({ 59 'verify_ssl': self.options.get('verify_ssl', True), 60 'proxies': self.options.get('proxies', None) 61 }) 62 self.token = wrapped_resource( 63 make_request('post', url, options)) 64 self.access_token = self.token.access_token 65 return self.token 66 67 def authorize_url(self): 68 """Return the authorization URL for OAuth2 authorization code flow.""" 69 return self._authorize_url 70 71 def _authorization_code_flow(self): 72 """Build the the auth URL so the user can authorize the app.""" 73 options = { 74 'scope': getattr(self, 'scope', 'non-expiring'), 75 'client_id': self.options.get('client_id'), 76 'response_type': 'code', 77 'redirect_uri': self._redirect_uri() 78 } 79 url = '%s%s/connect' % (self.scheme, self.host) 80 self._authorize_url = '%s?%s' % (url, urlencode(options)) 81 82 def _refresh_token_flow(self): 83 """Given a refresh token, obtain a new access token.""" 84 url = '%s%s/oauth2/token' % (self.scheme, self.host) 85 options = { 86 'grant_type': 'refresh_token', 87 'client_id': self.options.get('client_id'), 88 'client_secret': self.options.get('client_secret'), 89 'refresh_token': self.options.get('refresh_token') 90 } 91 options.update({ 92 'verify_ssl': self.options.get('verify_ssl', True), 93 'proxies': self.options.get('proxies', None) 94 }) 95 self.token = wrapped_resource( 96 make_request('post', url, options)) 97 self.access_token = self.token.access_token 98 99 def _credentials_flow(self): 100 """Given a username and password, obtain an access token.""" 101 url = '%s%s/oauth2/token' % (self.scheme, self.host) 102 options = { 103 'client_id': self.options.get('client_id'), 104 'client_secret': self.options.get('client_secret'), 105 'username': self.options.get('username'), 106 'password': self.options.get('password'), 107 'scope': getattr(self, 'scope', ''), 108 'grant_type': 'password' 109 } 110 options.update({ 111 'verify_ssl': self.options.get('verify_ssl', True), 112 'proxies': self.options.get('proxies', None) 113 }) 114 self.token = wrapped_resource( 115 make_request('post', url, options)) 116 self.access_token = self.token.access_token 117 118 def _request(self, method, resource, **kwargs): 119 """Given an HTTP method, a resource name and kwargs, construct a 120 request and return the response. 121 """ 122 url = self._resolve_resource_name(resource) 123 124 if hasattr(self, 'access_token'): 125 kwargs.update(dict(oauth_token=self.access_token)) 126 if hasattr(self, 'client_id'): 127 kwargs.update(dict(client_id=self.client_id)) 128 129 kwargs.update({ 130 'verify_ssl': self.options.get('verify_ssl', True), 131 'proxies': self.options.get('proxies', None) 132 }) 133 return wrapped_resource(make_request(method, url, kwargs)) 134 135 def __getattr__(self, name, **kwargs): 136 """Translate an HTTP verb into a request method.""" 137 if name not in ('get', 'post', 'put', 'head', 'delete'): 138 raise AttributeError 139 return partial(self._request, name, **kwargs) 140 141 def _resolve_resource_name(self, name): 142 """Convert a resource name (e.g. tracks) into a URI.""" 143 if name[:4] == 'http': # already a url 144 return name 145 name = name.rstrip('/').lstrip('/') 146 return '%s%s/%s' % (self.scheme, self.host, name) 147 148 def _redirect_uri(self): 149 """ 150 Return the redirect uri. Checks for ``redirect_uri`` or common typo, 151 ``redirect_url`` 152 """ 153 return self.options.get( 154 'redirect_uri', 155 self.options.get('redirect_url', None)) 156 157 # Helper functions for testing arguments provided to the constructor. 158 def _options_present(self, options, kwargs): 159 return all(map(lambda k: k in kwargs, options)) 160 161 def _options_for_credentials_flow_present(self): 162 required = ('client_id', 'client_secret', 'username', 'password') 163 return self._options_present(required, self.options) 164 165 def _options_for_authorization_code_flow_present(self): 166 required = ('client_id', 'redirect_uri') 167 or_required = ('client_id', 'redirect_url') 168 return (self._options_present(required, self.options) or 169 self._options_present(or_required, self.options)) 170 171 def _options_for_token_refresh_present(self): 172 required = ('client_id', 'client_secret', 'refresh_token') 173 return self._options_present(required, self.options) 174