1import asyncio 2from datetime import datetime 3import inspect 4import sys 5import os 6import json 7import time 8import csv 9import decimal 10from decimal import Decimal 11from typing import Sequence, Optional 12 13from aiorpcx.curio import timeout_after, TaskTimeout, TaskGroup 14import aiohttp 15 16from . import util 17from .bitcoin import COIN 18from .i18n import _ 19from .util import (ThreadJob, make_dir, log_exceptions, 20 make_aiohttp_session, resource_path) 21from .network import Network 22from .simple_config import SimpleConfig 23from .logging import Logger 24 25 26DEFAULT_ENABLED = False 27DEFAULT_CURRENCY = "EUR" 28DEFAULT_EXCHANGE = "CoinGecko" # default exchange should ideally provide historical rates 29 30 31# See https://en.wikipedia.org/wiki/ISO_4217 32CCY_PRECISIONS = {'BHD': 3, 'BIF': 0, 'BYR': 0, 'CLF': 4, 'CLP': 0, 33 'CVE': 0, 'DJF': 0, 'GNF': 0, 'IQD': 3, 'ISK': 0, 34 'JOD': 3, 'JPY': 0, 'KMF': 0, 'KRW': 0, 'KWD': 3, 35 'LYD': 3, 'MGA': 1, 'MRO': 1, 'OMR': 3, 'PYG': 0, 36 'RWF': 0, 'TND': 3, 'UGX': 0, 'UYI': 0, 'VND': 0, 37 'VUV': 0, 'XAF': 0, 'XAU': 4, 'XOF': 0, 'XPF': 0} 38 39 40class ExchangeBase(Logger): 41 42 def __init__(self, on_quotes, on_history): 43 Logger.__init__(self) 44 self.history = {} 45 self.quotes = {} 46 self.on_quotes = on_quotes 47 self.on_history = on_history 48 49 async def get_raw(self, site, get_string): 50 # APIs must have https 51 url = ''.join(['https://', site, get_string]) 52 network = Network.get_instance() 53 proxy = network.proxy if network else None 54 async with make_aiohttp_session(proxy) as session: 55 async with session.get(url) as response: 56 response.raise_for_status() 57 return await response.text() 58 59 async def get_json(self, site, get_string): 60 # APIs must have https 61 url = ''.join(['https://', site, get_string]) 62 network = Network.get_instance() 63 proxy = network.proxy if network else None 64 async with make_aiohttp_session(proxy) as session: 65 async with session.get(url) as response: 66 response.raise_for_status() 67 # set content_type to None to disable checking MIME type 68 return await response.json(content_type=None) 69 70 async def get_csv(self, site, get_string): 71 raw = await self.get_raw(site, get_string) 72 reader = csv.DictReader(raw.split('\n')) 73 return list(reader) 74 75 def name(self): 76 return self.__class__.__name__ 77 78 async def update_safe(self, ccy): 79 try: 80 self.logger.info(f"getting fx quotes for {ccy}") 81 self.quotes = await self.get_rates(ccy) 82 self.logger.info("received fx quotes") 83 except asyncio.CancelledError: 84 # CancelledError must be passed-through for cancellation to work 85 raise 86 except aiohttp.ClientError as e: 87 self.logger.info(f"failed fx quotes: {repr(e)}") 88 self.quotes = {} 89 except Exception as e: 90 self.logger.exception(f"failed fx quotes: {repr(e)}") 91 self.quotes = {} 92 self.on_quotes() 93 94 def read_historical_rates(self, ccy, cache_dir) -> Optional[dict]: 95 filename = os.path.join(cache_dir, self.name() + '_'+ ccy) 96 if not os.path.exists(filename): 97 return None 98 timestamp = os.stat(filename).st_mtime 99 try: 100 with open(filename, 'r', encoding='utf-8') as f: 101 h = json.loads(f.read()) 102 except: 103 return None 104 if not h: # e.g. empty dict 105 return None 106 h['timestamp'] = timestamp 107 self.history[ccy] = h 108 self.on_history() 109 return h 110 111 @log_exceptions 112 async def get_historical_rates_safe(self, ccy, cache_dir): 113 try: 114 self.logger.info(f"requesting fx history for {ccy}") 115 h = await self.request_history(ccy) 116 self.logger.info(f"received fx history for {ccy}") 117 except aiohttp.ClientError as e: 118 self.logger.info(f"failed fx history: {repr(e)}") 119 return 120 except Exception as e: 121 self.logger.exception(f"failed fx history: {repr(e)}") 122 return 123 filename = os.path.join(cache_dir, self.name() + '_' + ccy) 124 with open(filename, 'w', encoding='utf-8') as f: 125 f.write(json.dumps(h)) 126 h['timestamp'] = time.time() 127 self.history[ccy] = h 128 self.on_history() 129 130 def get_historical_rates(self, ccy, cache_dir): 131 if ccy not in self.history_ccys(): 132 return 133 h = self.history.get(ccy) 134 if h is None: 135 h = self.read_historical_rates(ccy, cache_dir) 136 if h is None or h['timestamp'] < time.time() - 24*3600: 137 asyncio.get_event_loop().create_task(self.get_historical_rates_safe(ccy, cache_dir)) 138 139 def history_ccys(self): 140 return [] 141 142 def historical_rate(self, ccy, d_t): 143 return self.history.get(ccy, {}).get(d_t.strftime('%Y-%m-%d'), 'NaN') 144 145 async def request_history(self, ccy): 146 raise NotImplementedError() # implemented by subclasses 147 148 async def get_rates(self, ccy): 149 raise NotImplementedError() # implemented by subclasses 150 151 async def get_currencies(self): 152 rates = await self.get_rates('') 153 return sorted([str(a) for (a, b) in rates.items() if b is not None and len(a)==3]) 154 155 156class BitcoinAverage(ExchangeBase): 157 # note: historical rates used to be freely available 158 # but this is no longer the case. see #5188 159 160 async def get_rates(self, ccy): 161 json = await self.get_json('apiv2.bitcoinaverage.com', '/indices/global/ticker/short') 162 return dict([(r.replace("BTC", ""), Decimal(json[r]['last'])) 163 for r in json if r != 'timestamp']) 164 165 166class Bitcointoyou(ExchangeBase): 167 168 async def get_rates(self, ccy): 169 json = await self.get_json('bitcointoyou.com', "/API/ticker.aspx") 170 return {'BRL': Decimal(json['ticker']['last'])} 171 172 173class BitcoinVenezuela(ExchangeBase): 174 175 async def get_rates(self, ccy): 176 json = await self.get_json('api.bitcoinvenezuela.com', '/') 177 rates = [(r, json['BTC'][r]) for r in json['BTC'] 178 if json['BTC'][r] is not None] # Giving NULL for LTC 179 return dict(rates) 180 181 def history_ccys(self): 182 return ['ARS', 'EUR', 'USD', 'VEF'] 183 184 async def request_history(self, ccy): 185 json = await self.get_json('api.bitcoinvenezuela.com', 186 "/historical/index.php?coin=BTC") 187 return json[ccy +'_BTC'] 188 189 190class Bitbank(ExchangeBase): 191 192 async def get_rates(self, ccy): 193 json = await self.get_json('public.bitbank.cc', '/btc_jpy/ticker') 194 return {'JPY': Decimal(json['data']['last'])} 195 196 197class BitFlyer(ExchangeBase): 198 199 async def get_rates(self, ccy): 200 json = await self.get_json('bitflyer.jp', '/api/echo/price') 201 return {'JPY': Decimal(json['mid'])} 202 203 204class BitPay(ExchangeBase): 205 206 async def get_rates(self, ccy): 207 json = await self.get_json('bitpay.com', '/api/rates') 208 return dict([(r['code'], Decimal(r['rate'])) for r in json]) 209 210 211class Bitso(ExchangeBase): 212 213 async def get_rates(self, ccy): 214 json = await self.get_json('api.bitso.com', '/v2/ticker') 215 return {'MXN': Decimal(json['last'])} 216 217 218class BitStamp(ExchangeBase): 219 220 async def get_currencies(self): 221 return ['USD', 'EUR'] 222 223 async def get_rates(self, ccy): 224 if ccy in CURRENCIES[self.name()]: 225 json = await self.get_json('www.bitstamp.net', f'/api/v2/ticker/btc{ccy.lower()}/') 226 return {ccy: Decimal(json['last'])} 227 return {} 228 229 230class Bitvalor(ExchangeBase): 231 232 async def get_rates(self,ccy): 233 json = await self.get_json('api.bitvalor.com', '/v1/ticker.json') 234 return {'BRL': Decimal(json['ticker_1h']['total']['last'])} 235 236 237class BlockchainInfo(ExchangeBase): 238 239 async def get_rates(self, ccy): 240 json = await self.get_json('blockchain.info', '/ticker') 241 return dict([(r, Decimal(json[r]['15m'])) for r in json]) 242 243 244class Bylls(ExchangeBase): 245 246 async def get_rates(self, ccy): 247 json = await self.get_json('bylls.com', '/api/price?from_currency=BTC&to_currency=CAD') 248 return {'CAD': Decimal(json['public_price']['to_price'])} 249 250 251class Coinbase(ExchangeBase): 252 253 async def get_rates(self, ccy): 254 json = await self.get_json('api.coinbase.com', 255 '/v2/exchange-rates?currency=BTC') 256 return {ccy: Decimal(rate) for (ccy, rate) in json["data"]["rates"].items()} 257 258 259class CoinCap(ExchangeBase): 260 261 async def get_rates(self, ccy): 262 json = await self.get_json('api.coincap.io', '/v2/rates/bitcoin/') 263 return {'USD': Decimal(json['data']['rateUsd'])} 264 265 def history_ccys(self): 266 return ['USD'] 267 268 async def request_history(self, ccy): 269 # Currently 2000 days is the maximum in 1 API call 270 # (and history starts on 2017-03-23) 271 history = await self.get_json('api.coincap.io', 272 '/v2/assets/bitcoin/history?interval=d1&limit=2000') 273 return dict([(datetime.utcfromtimestamp(h['time']/1000).strftime('%Y-%m-%d'), h['priceUsd']) 274 for h in history['data']]) 275 276 277class CoinDesk(ExchangeBase): 278 279 async def get_currencies(self): 280 dicts = await self.get_json('api.coindesk.com', 281 '/v1/bpi/supported-currencies.json') 282 return [d['currency'] for d in dicts] 283 284 async def get_rates(self, ccy): 285 json = await self.get_json('api.coindesk.com', 286 '/v1/bpi/currentprice/%s.json' % ccy) 287 result = {ccy: Decimal(json['bpi'][ccy]['rate_float'])} 288 return result 289 290 def history_starts(self): 291 return {'USD': '2012-11-30', 'EUR': '2013-09-01'} 292 293 def history_ccys(self): 294 return self.history_starts().keys() 295 296 async def request_history(self, ccy): 297 start = self.history_starts()[ccy] 298 end = datetime.today().strftime('%Y-%m-%d') 299 # Note ?currency and ?index don't work as documented. Sigh. 300 query = ('/v1/bpi/historical/close.json?start=%s&end=%s' 301 % (start, end)) 302 json = await self.get_json('api.coindesk.com', query) 303 return json['bpi'] 304 305 306class CoinGecko(ExchangeBase): 307 308 async def get_rates(self, ccy): 309 json = await self.get_json('api.coingecko.com', '/api/v3/exchange_rates') 310 return dict([(ccy.upper(), Decimal(d['value'])) 311 for ccy, d in json['rates'].items()]) 312 313 def history_ccys(self): 314 # CoinGecko seems to have historical data for all ccys it supports 315 return CURRENCIES[self.name()] 316 317 async def request_history(self, ccy): 318 history = await self.get_json('api.coingecko.com', 319 '/api/v3/coins/bitcoin/market_chart?vs_currency=%s&days=max' % ccy) 320 321 return dict([(datetime.utcfromtimestamp(h[0]/1000).strftime('%Y-%m-%d'), h[1]) 322 for h in history['prices']]) 323 324 325class CointraderMonitor(ExchangeBase): 326 327 async def get_rates(self, ccy): 328 json = await self.get_json('cointradermonitor.com', '/api/pbb/v1/ticker') 329 return {'BRL': Decimal(json['last'])} 330 331 332class itBit(ExchangeBase): 333 334 async def get_rates(self, ccy): 335 ccys = ['USD', 'EUR', 'SGD'] 336 json = await self.get_json('api.itbit.com', '/v1/markets/XBT%s/ticker' % ccy) 337 result = dict.fromkeys(ccys) 338 if ccy in ccys: 339 result[ccy] = Decimal(json['lastPrice']) 340 return result 341 342 343class Kraken(ExchangeBase): 344 345 async def get_rates(self, ccy): 346 ccys = ['EUR', 'USD', 'CAD', 'GBP', 'JPY'] 347 pairs = ['XBT%s' % c for c in ccys] 348 json = await self.get_json('api.kraken.com', 349 '/0/public/Ticker?pair=%s' % ','.join(pairs)) 350 return dict((k[-3:], Decimal(float(v['c'][0]))) 351 for k, v in json['result'].items()) 352 353 354class LocalBitcoins(ExchangeBase): 355 356 async def get_rates(self, ccy): 357 json = await self.get_json('localbitcoins.com', 358 '/bitcoinaverage/ticker-all-currencies/') 359 return dict([(r, Decimal(json[r]['rates']['last'])) for r in json]) 360 361 362class MercadoBitcoin(ExchangeBase): 363 364 async def get_rates(self, ccy): 365 json = await self.get_json('api.bitvalor.com', '/v1/ticker.json') 366 return {'BRL': Decimal(json['ticker_1h']['exchanges']['MBT']['last'])} 367 368 369class TheRockTrading(ExchangeBase): 370 371 async def get_rates(self, ccy): 372 json = await self.get_json('api.therocktrading.com', 373 '/v1/funds/BTCEUR/ticker') 374 return {'EUR': Decimal(json['last'])} 375 376 377class Winkdex(ExchangeBase): 378 379 async def get_rates(self, ccy): 380 json = await self.get_json('winkdex.com', '/api/v0/price') 381 return {'USD': Decimal(json['price'] / 100.0)} 382 383 def history_ccys(self): 384 return ['USD'] 385 386 async def request_history(self, ccy): 387 json = await self.get_json('winkdex.com', 388 "/api/v0/series?start_time=1342915200") 389 history = json['series'][0]['results'] 390 return dict([(h['timestamp'][:10], h['price'] / 100.0) 391 for h in history]) 392 393 394class Zaif(ExchangeBase): 395 async def get_rates(self, ccy): 396 json = await self.get_json('api.zaif.jp', '/api/1/last_price/btc_jpy') 397 return {'JPY': Decimal(json['last_price'])} 398 399 400class Bitragem(ExchangeBase): 401 402 async def get_rates(self,ccy): 403 json = await self.get_json('api.bitragem.com', '/v1/index?asset=BTC&market=BRL') 404 return {'BRL': Decimal(json['response']['index'])} 405 406 407class Biscoint(ExchangeBase): 408 409 async def get_rates(self,ccy): 410 json = await self.get_json('api.biscoint.io', '/v1/ticker?base=BTC"e=BRL') 411 return {'BRL': Decimal(json['data']['last'])} 412 413 414class Walltime(ExchangeBase): 415 416 async def get_rates(self, ccy): 417 json = await self.get_json('s3.amazonaws.com', 418 '/data-production-walltime-info/production/dynamic/walltime-info.json') 419 return {'BRL': Decimal(json['BRL_XBT']['last_inexact'])} 420 421 422def dictinvert(d): 423 inv = {} 424 for k, vlist in d.items(): 425 for v in vlist: 426 keys = inv.setdefault(v, []) 427 keys.append(k) 428 return inv 429 430def get_exchanges_and_currencies(): 431 # load currencies.json from disk 432 path = resource_path('currencies.json') 433 try: 434 with open(path, 'r', encoding='utf-8') as f: 435 return json.loads(f.read()) 436 except: 437 pass 438 # or if not present, generate it now. 439 print("cannot find currencies.json. will regenerate it now.") 440 d = {} 441 is_exchange = lambda obj: (inspect.isclass(obj) 442 and issubclass(obj, ExchangeBase) 443 and obj != ExchangeBase) 444 exchanges = dict(inspect.getmembers(sys.modules[__name__], is_exchange)) 445 446 async def get_currencies_safe(name, exchange): 447 try: 448 d[name] = await exchange.get_currencies() 449 print(name, "ok") 450 except: 451 print(name, "error") 452 453 async def query_all_exchanges_for_their_ccys_over_network(): 454 async with timeout_after(10): 455 async with TaskGroup() as group: 456 for name, klass in exchanges.items(): 457 exchange = klass(None, None) 458 await group.spawn(get_currencies_safe(name, exchange)) 459 loop = asyncio.get_event_loop() 460 try: 461 loop.run_until_complete(query_all_exchanges_for_their_ccys_over_network()) 462 except Exception as e: 463 pass 464 with open(path, 'w', encoding='utf-8') as f: 465 f.write(json.dumps(d, indent=4, sort_keys=True)) 466 return d 467 468 469CURRENCIES = get_exchanges_and_currencies() 470 471 472def get_exchanges_by_ccy(history=True): 473 if not history: 474 return dictinvert(CURRENCIES) 475 d = {} 476 exchanges = CURRENCIES.keys() 477 for name in exchanges: 478 klass = globals()[name] 479 exchange = klass(None, None) 480 d[name] = exchange.history_ccys() 481 return dictinvert(d) 482 483 484class FxThread(ThreadJob): 485 486 def __init__(self, config: SimpleConfig, network: Optional[Network]): 487 ThreadJob.__init__(self) 488 self.config = config 489 self.network = network 490 util.register_callback(self.set_proxy, ['proxy_set']) 491 self.ccy = self.get_currency() 492 self.history_used_spot = False 493 self.ccy_combo = None 494 self.hist_checkbox = None 495 self.cache_dir = os.path.join(config.path, 'cache') 496 self._trigger = asyncio.Event() 497 self._trigger.set() 498 self.set_exchange(self.config_exchange()) 499 make_dir(self.cache_dir) 500 501 def set_proxy(self, trigger_name, *args): 502 self._trigger.set() 503 504 @staticmethod 505 def get_currencies(history: bool) -> Sequence[str]: 506 d = get_exchanges_by_ccy(history) 507 return sorted(d.keys()) 508 509 @staticmethod 510 def get_exchanges_by_ccy(ccy: str, history: bool) -> Sequence[str]: 511 d = get_exchanges_by_ccy(history) 512 return d.get(ccy, []) 513 514 @staticmethod 515 def remove_thousands_separator(text): 516 return text.replace(',', '') # FIXME use THOUSAND_SEPARATOR in util 517 518 def ccy_amount_str(self, amount, commas): 519 prec = CCY_PRECISIONS.get(self.ccy, 2) 520 fmt_str = "{:%s.%df}" % ("," if commas else "", max(0, prec)) # FIXME use util.THOUSAND_SEPARATOR and util.DECIMAL_POINT 521 try: 522 rounded_amount = round(amount, prec) 523 except decimal.InvalidOperation: 524 rounded_amount = amount 525 return fmt_str.format(rounded_amount) 526 527 async def run(self): 528 while True: 529 # approx. every 2.5 minutes, refresh spot price 530 try: 531 async with timeout_after(150): 532 await self._trigger.wait() 533 self._trigger.clear() 534 # we were manually triggered, so get historical rates 535 if self.is_enabled() and self.show_history(): 536 self.exchange.get_historical_rates(self.ccy, self.cache_dir) 537 except TaskTimeout: 538 pass 539 if self.is_enabled(): 540 await self.exchange.update_safe(self.ccy) 541 542 def is_enabled(self): 543 return bool(self.config.get('use_exchange_rate', DEFAULT_ENABLED)) 544 545 def set_enabled(self, b): 546 self.config.set_key('use_exchange_rate', bool(b)) 547 self.trigger_update() 548 549 def get_history_config(self, *, allow_none=False): 550 val = self.config.get('history_rates', None) 551 if val is None and allow_none: 552 return None 553 return bool(val) 554 555 def set_history_config(self, b): 556 self.config.set_key('history_rates', bool(b)) 557 558 def get_history_capital_gains_config(self): 559 return bool(self.config.get('history_rates_capital_gains', False)) 560 561 def set_history_capital_gains_config(self, b): 562 self.config.set_key('history_rates_capital_gains', bool(b)) 563 564 def get_fiat_address_config(self): 565 return bool(self.config.get('fiat_address')) 566 567 def set_fiat_address_config(self, b): 568 self.config.set_key('fiat_address', bool(b)) 569 570 def get_currency(self): 571 '''Use when dynamic fetching is needed''' 572 return self.config.get("currency", DEFAULT_CURRENCY) 573 574 def config_exchange(self): 575 return self.config.get('use_exchange', DEFAULT_EXCHANGE) 576 577 def show_history(self): 578 return self.is_enabled() and self.get_history_config() and self.ccy in self.exchange.history_ccys() 579 580 def set_currency(self, ccy: str): 581 self.ccy = ccy 582 self.config.set_key('currency', ccy, True) 583 self.trigger_update() 584 self.on_quotes() 585 586 def trigger_update(self): 587 if self.network: 588 self.network.asyncio_loop.call_soon_threadsafe(self._trigger.set) 589 590 def set_exchange(self, name): 591 class_ = globals().get(name) or globals().get(DEFAULT_EXCHANGE) 592 self.logger.info(f"using exchange {name}") 593 if self.config_exchange() != name: 594 self.config.set_key('use_exchange', name, True) 595 assert issubclass(class_, ExchangeBase), f"unexpected type {class_} for {name}" 596 self.exchange = class_(self.on_quotes, self.on_history) # type: ExchangeBase 597 # A new exchange means new fx quotes, initially empty. Force 598 # a quote refresh 599 self.trigger_update() 600 self.exchange.read_historical_rates(self.ccy, self.cache_dir) 601 602 def on_quotes(self): 603 util.trigger_callback('on_quotes') 604 605 def on_history(self): 606 util.trigger_callback('on_history') 607 608 def exchange_rate(self) -> Decimal: 609 """Returns the exchange rate as a Decimal""" 610 if not self.is_enabled(): 611 return Decimal('NaN') 612 rate = self.exchange.quotes.get(self.ccy) 613 if rate is None: 614 return Decimal('NaN') 615 return Decimal(rate) 616 617 def format_amount(self, btc_balance, *, timestamp: int = None) -> str: 618 if timestamp is None: 619 rate = self.exchange_rate() 620 else: 621 rate = self.timestamp_rate(timestamp) 622 return '' if rate.is_nan() else "%s" % self.value_str(btc_balance, rate) 623 624 def format_amount_and_units(self, btc_balance, *, timestamp: int = None) -> str: 625 if timestamp is None: 626 rate = self.exchange_rate() 627 else: 628 rate = self.timestamp_rate(timestamp) 629 return '' if rate.is_nan() else "%s %s" % (self.value_str(btc_balance, rate), self.ccy) 630 631 def get_fiat_status_text(self, btc_balance, base_unit, decimal_point): 632 rate = self.exchange_rate() 633 return _(" (No FX rate available)") if rate.is_nan() else " 1 %s~%s %s" % (base_unit, 634 self.value_str(COIN / (10**(8 - decimal_point)), rate), self.ccy) 635 636 def fiat_value(self, satoshis, rate) -> Decimal: 637 return Decimal('NaN') if satoshis is None else Decimal(satoshis) / COIN * Decimal(rate) 638 639 def value_str(self, satoshis, rate) -> str: 640 return self.format_fiat(self.fiat_value(satoshis, rate)) 641 642 def format_fiat(self, value: Decimal) -> str: 643 if value.is_nan(): 644 return _("No data") 645 return "%s" % (self.ccy_amount_str(value, True)) 646 647 def history_rate(self, d_t: Optional[datetime]) -> Decimal: 648 if d_t is None: 649 return Decimal('NaN') 650 rate = self.exchange.historical_rate(self.ccy, d_t) 651 # Frequently there is no rate for today, until tomorrow :) 652 # Use spot quotes in that case 653 if rate in ('NaN', None) and (datetime.today().date() - d_t.date()).days <= 2: 654 rate = self.exchange.quotes.get(self.ccy, 'NaN') 655 self.history_used_spot = True 656 if rate is None: 657 rate = 'NaN' 658 return Decimal(rate) 659 660 def historical_value_str(self, satoshis, d_t: Optional[datetime]) -> str: 661 return self.format_fiat(self.historical_value(satoshis, d_t)) 662 663 def historical_value(self, satoshis, d_t: Optional[datetime]) -> Decimal: 664 return self.fiat_value(satoshis, self.history_rate(d_t)) 665 666 def timestamp_rate(self, timestamp: Optional[int]) -> Decimal: 667 from .util import timestamp_to_datetime 668 date = timestamp_to_datetime(timestamp) 669 return self.history_rate(date) 670 671 672assert globals().get(DEFAULT_EXCHANGE), f"default exchange {DEFAULT_EXCHANGE} does not exist" 673