1from __future__ import absolute_import, unicode_literals 2 3from kombu.pools import producers 4 5from .queues import task_exchange 6 7priority_to_routing_key = { 8 'high': 'hipri', 9 'mid': 'midpri', 10 'low': 'lopri', 11} 12 13 14def send_as_task(connection, fun, args=(), kwargs={}, priority='mid'): 15 payload = {'fun': fun, 'args': args, 'kwargs': kwargs} 16 routing_key = priority_to_routing_key[priority] 17 18 with producers[connection].acquire(block=True) as producer: 19 producer.publish(payload, 20 serializer='pickle', 21 compression='bzip2', 22 exchange=task_exchange, 23 declare=[task_exchange], 24 routing_key=routing_key) 25 26if __name__ == '__main__': 27 from kombu import Connection 28 from .tasks import hello_task 29 30 connection = Connection('amqp://guest:guest@localhost:5672//') 31 send_as_task(connection, fun=hello_task, args=('Kombu',), kwargs={}, 32 priority='high') 33