1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vi:ts=4:et 4 5from . import localhost 6import threading 7import pycurl 8import pytest 9import unittest 10 11from . import appmanager 12from . import util 13 14setup_module, teardown_module = appmanager.setup(('app', 8380)) 15 16class WorkerThread(threading.Thread): 17 18 def __init__(self, share): 19 threading.Thread.__init__(self) 20 self.curl = util.DefaultCurl() 21 self.curl.setopt(pycurl.URL, 'http://%s:8380/success' % localhost) 22 self.curl.setopt(pycurl.SHARE, share) 23 self.sio = util.BytesIO() 24 self.curl.setopt(pycurl.WRITEFUNCTION, self.sio.write) 25 26 def run(self): 27 self.curl.perform() 28 self.curl.close() 29 30class ShareTest(unittest.TestCase): 31 def test_share(self): 32 s = pycurl.CurlShare() 33 s.setopt(pycurl.SH_SHARE, pycurl.LOCK_DATA_COOKIE) 34 s.setopt(pycurl.SH_SHARE, pycurl.LOCK_DATA_DNS) 35 s.setopt(pycurl.SH_SHARE, pycurl.LOCK_DATA_SSL_SESSION) 36 37 t1 = WorkerThread(s) 38 t2 = WorkerThread(s) 39 40 t1.start() 41 t2.start() 42 43 t1.join() 44 t2.join() 45 46 del s 47 48 self.assertEqual('success', t1.sio.getvalue().decode()) 49 self.assertEqual('success', t2.sio.getvalue().decode()) 50 51 def test_share_close(self): 52 s = pycurl.CurlShare() 53 s.close() 54 55 def test_share_close_twice(self): 56 s = pycurl.CurlShare() 57 s.close() 58 s.close() 59 60 # positional arguments are rejected 61 def test_positional_arguments(self): 62 with pytest.raises(TypeError): 63 pycurl.CurlShare(1) 64 65 # keyword arguments are rejected 66 def test_keyword_arguments(self): 67 with pytest.raises(TypeError): 68 pycurl.CurlShare(a=1) 69