1# -*- coding: utf-8 -*-
2#
3# Copyright (C) 2007-2009 Christopher Lenz
4# All rights reserved.
5#
6# This software is licensed as described in the file COPYING, which
7# you should have received as part of this distribution.
8
9from datetime import datetime
10import os
11import os.path
12import shutil
13import time
14import tempfile
15import threading
16import unittest
17
18from couchdb import client, http, util
19from couchdb.tests import testutil
20
21
22class ServerTestCase(testutil.TempDatabaseMixin, unittest.TestCase):
23
24    def test_init_with_resource(self):
25        sess = http.Session()
26        res = http.Resource(client.DEFAULT_BASE_URL, sess)
27        serv = client.Server(url=res)
28        serv.config()
29
30    def test_init_with_session(self):
31        sess = http.Session()
32        serv = client.Server(client.DEFAULT_BASE_URL, session=sess)
33        serv.config()
34        self.assertTrue(serv.resource.session is sess)
35
36    def test_exists(self):
37        self.assertTrue(client.Server(client.DEFAULT_BASE_URL))
38        self.assertFalse(client.Server('http://localhost:9999'))
39
40    def test_repr(self):
41        repr(self.server)
42
43    def test_server_vars(self):
44        version = self.server.version()
45        self.assertTrue(isinstance(version, util.strbase))
46        config = self.server.config()
47        self.assertTrue(isinstance(config, dict))
48        tasks = self.server.tasks()
49        self.assertTrue(isinstance(tasks, list))
50
51    def test_server_stats(self):
52        stats = self.server.stats()
53        self.assertTrue(isinstance(stats, dict))
54        stats = self.server.stats('httpd/requests')
55        self.assertTrue(isinstance(stats, dict))
56        self.assertTrue(len(stats) == 1 and len(stats['httpd']) == 1)
57
58    def test_get_db_missing(self):
59        self.assertRaises(http.ResourceNotFound,
60                          lambda: self.server['couchdb-python/missing'])
61
62    def test_create_db_conflict(self):
63        name, db = self.temp_db()
64        self.assertRaises(http.PreconditionFailed, self.server.create,
65                          name)
66
67    def test_delete_db(self):
68        name, db = self.temp_db()
69        assert name in self.server
70        self.del_db(name)
71        assert name not in self.server
72
73    def test_delete_db_missing(self):
74        self.assertRaises(http.ResourceNotFound, self.server.delete,
75                          'couchdb-python/missing')
76
77    def test_replicate(self):
78        aname, a = self.temp_db()
79        bname, b = self.temp_db()
80        id, rev = a.save({'test': 'a'})
81        result = self.server.replicate(aname, bname)
82        self.assertEqual(result['ok'], True)
83        self.assertEqual(b[id]['test'], 'a')
84
85        doc = b[id]
86        doc['test'] = 'b'
87        b.update([doc])
88        self.server.replicate(bname, aname)
89        self.assertEqual(a[id]['test'], 'b')
90        self.assertEqual(b[id]['test'], 'b')
91
92    def test_replicate_continuous(self):
93        aname, a = self.temp_db()
94        bname, b = self.temp_db()
95        result = self.server.replicate(aname, bname, continuous=True)
96        self.assertEqual(result['ok'], True)
97        version = tuple(int(i) for i in self.server.version().split('.')[:2])
98        if version >= (0, 10):
99            self.assertTrue('_local_id' in result)
100
101    def test_iter(self):
102        aname, a = self.temp_db()
103        bname, b = self.temp_db()
104        dbs = list(self.server)
105        self.assertTrue(aname in dbs)
106        self.assertTrue(bname in dbs)
107
108    def test_len(self):
109        self.temp_db()
110        self.temp_db()
111        self.assertTrue(len(self.server) >= 2)
112
113    def test_uuids(self):
114        ls = self.server.uuids()
115        assert type(ls) == list
116        ls = self.server.uuids(count=10)
117        assert type(ls) == list and len(ls) == 10
118
119    def test_235_unicode_server(self):
120
121        url = client.DEFAULT_BASE_URL
122        if not isinstance(url, util.utype):
123            url = url.decode('utf-8')
124
125        server = client.Server(url)
126        dbname = 'couchdb-python/test-235-unicode-server'
127        db = server.create(dbname)
128        try:
129            db.update([{'foo': u'\ua000'}])
130        finally:
131            server.delete(dbname)
132
133    def test_basic_auth(self):
134        url = "http://root:password@localhost:5984/"
135        server = client.Server(url)
136        dbname = 'couchdb-python/test_basic_auth'
137        self.assertRaises(http.Unauthorized, server.create, dbname)
138
139    def test_user_management(self):
140        url = client.DEFAULT_BASE_URL
141        if not isinstance(url, util.utype):
142            url = url.decode('utf-8')
143
144        server = client.Server(url)
145        try:
146            server.add_user('foo', 'secret', roles=['hero'])
147            token = server.login('foo', 'secret')
148            self.assertTrue(server.verify_token(token))
149            self.assertTrue(server.logout(token))
150        finally:
151            server.remove_user('foo')
152
153
154class DatabaseTestCase(testutil.TempDatabaseMixin, unittest.TestCase):
155
156    def test_save_new(self):
157        doc = {'foo': 'bar'}
158        id, rev = self.db.save(doc)
159        self.assertTrue(id is not None)
160        self.assertTrue(rev is not None)
161        self.assertEqual((id, rev), (doc['_id'], doc['_rev']))
162        doc = self.db.get(id)
163        self.assertEqual(doc['foo'], 'bar')
164
165    def test_save_new_with_id(self):
166        doc = {'_id': 'foo'}
167        id, rev = self.db.save(doc)
168        self.assertTrue(doc['_id'] == id == 'foo')
169        self.assertEqual(doc['_rev'], rev)
170
171    def test_save_existing(self):
172        doc = {}
173        id_rev_old = self.db.save(doc)
174        doc['foo'] = True
175        id_rev_new = self.db.save(doc)
176        self.assertTrue(doc['_rev'] == id_rev_new[1])
177        self.assertTrue(id_rev_old[1] != id_rev_new[1])
178
179    def test_save_new_batch(self):
180        doc = {'_id': 'foo'}
181        id, rev = self.db.save(doc, batch='ok')
182        self.assertTrue(rev is None)
183        self.assertTrue('_rev' not in doc)
184
185    def test_save_existing_batch(self):
186        doc = {'_id': 'foo'}
187        self.db.save(doc)
188        id_rev_old = self.db.save(doc)
189        id_rev_new = self.db.save(doc, batch='ok')
190        self.assertTrue(id_rev_new[1] is None)
191        self.assertEqual(id_rev_old[1], doc['_rev'])
192
193    def test_exists(self):
194        self.assertTrue(self.db)
195        self.assertFalse(client.Database('couchdb-python/missing'))
196
197    def test_name(self):
198        # Access name assigned during creation.
199        name, db = self.temp_db()
200        self.assertTrue(db.name == name)
201        # Access lazily loaded name,
202        self.assertTrue(client.Database(db.resource.url).name == name)
203
204    def test_commit(self):
205        self.assertTrue(self.db.commit()['ok'] == True)
206
207    def test_create_large_doc(self):
208        self.db['foo'] = {'data': '0123456789' * 110 * 1024} # 10 MB
209        self.assertEqual('foo', self.db['foo']['_id'])
210
211    def test_doc_id_quoting(self):
212        self.db['foo/bar'] = {'foo': 'bar'}
213        self.assertEqual('bar', self.db['foo/bar']['foo'])
214        del self.db['foo/bar']
215        self.assertEqual(None, self.db.get('foo/bar'))
216
217    def test_unicode(self):
218        self.db[u'føø'] = {u'bår': u'Iñtërnâtiônàlizætiøn', 'baz': 'ASCII'}
219        self.assertEqual(u'Iñtërnâtiônàlizætiøn', self.db[u'føø'][u'bår'])
220        self.assertEqual(u'ASCII', self.db[u'føø'][u'baz'])
221
222    def test_disallow_nan(self):
223        try:
224            self.db['foo'] = {'number': float('nan')}
225            self.fail('Expected ValueError')
226        except ValueError:
227            pass
228
229    def test_disallow_none_id(self):
230        deldoc = lambda: self.db.delete({'_id': None, '_rev': None})
231        self.assertRaises(ValueError, deldoc)
232
233    def test_doc_revs(self):
234        doc = {'bar': 42}
235        self.db['foo'] = doc
236        old_rev = doc['_rev']
237        doc['bar'] = 43
238        self.db['foo'] = doc
239        new_rev = doc['_rev']
240
241        new_doc = self.db.get('foo')
242        self.assertEqual(new_rev, new_doc['_rev'])
243        new_doc = self.db.get('foo', rev=new_rev)
244        self.assertEqual(new_rev, new_doc['_rev'])
245        old_doc = self.db.get('foo', rev=old_rev)
246        self.assertEqual(old_rev, old_doc['_rev'])
247
248        revs = [i for i in self.db.revisions('foo')]
249        self.assertEqual(revs[0]['_rev'], new_rev)
250        self.assertEqual(revs[1]['_rev'], old_rev)
251        gen = self.db.revisions('crap')
252        self.assertRaises(StopIteration, lambda: next(gen))
253
254        self.assertTrue(self.db.compact())
255        while self.db.info()['compact_running']:
256            pass
257
258        # 0.10 responds with 404, 0.9 responds with 500, same content
259        doc = 'fail'
260        try:
261            doc = self.db.get('foo', rev=old_rev)
262        except http.ServerError:
263            doc = None
264        assert doc is None
265
266    def test_attachment_crud(self):
267        doc = {'bar': 42}
268        self.db['foo'] = doc
269        old_rev = doc['_rev']
270
271        self.db.put_attachment(doc, 'Foo bar', 'foo.txt', 'text/plain')
272        self.assertNotEqual(old_rev, doc['_rev'])
273
274        doc = self.db['foo']
275        attachment = doc['_attachments']['foo.txt']
276        self.assertEqual(len('Foo bar'), attachment['length'])
277        self.assertEqual('text/plain', attachment['content_type'])
278
279        self.assertEqual(b'Foo bar',
280                         self.db.get_attachment(doc, 'foo.txt').read())
281        self.assertEqual(b'Foo bar',
282                         self.db.get_attachment('foo', 'foo.txt').read())
283
284        old_rev = doc['_rev']
285        self.db.delete_attachment(doc, 'foo.txt')
286        self.assertNotEqual(old_rev, doc['_rev'])
287        self.assertEqual(None, self.db['foo'].get('_attachments'))
288
289    def test_attachment_crud_with_files(self):
290        doc = {'bar': 42}
291        self.db['foo'] = doc
292        old_rev = doc['_rev']
293        fileobj = util.StringIO(b'Foo bar baz')
294
295        self.db.put_attachment(doc, fileobj, 'foo.txt')
296        self.assertNotEqual(old_rev, doc['_rev'])
297
298        doc = self.db['foo']
299        attachment = doc['_attachments']['foo.txt']
300        self.assertEqual(len('Foo bar baz'), attachment['length'])
301        self.assertEqual('text/plain', attachment['content_type'])
302
303        self.assertEqual(b'Foo bar baz',
304                         self.db.get_attachment(doc, 'foo.txt').read())
305        self.assertEqual(b'Foo bar baz',
306                         self.db.get_attachment('foo', 'foo.txt').read())
307
308        old_rev = doc['_rev']
309        self.db.delete_attachment(doc, 'foo.txt')
310        self.assertNotEqual(old_rev, doc['_rev'])
311        self.assertEqual(None, self.db['foo'].get('_attachments'))
312
313    def test_empty_attachment(self):
314        doc = {}
315        self.db['foo'] = doc
316        old_rev = doc['_rev']
317
318        self.db.put_attachment(doc, '', 'empty.txt')
319        self.assertNotEqual(old_rev, doc['_rev'])
320
321        doc = self.db['foo']
322        attachment = doc['_attachments']['empty.txt']
323        self.assertEqual(0, attachment['length'])
324
325    def test_default_attachment(self):
326        doc = {}
327        self.db['foo'] = doc
328        self.assertTrue(self.db.get_attachment(doc, 'missing.txt') is None)
329        sentinel = object()
330        self.assertTrue(self.db.get_attachment(doc, 'missing.txt', sentinel) is sentinel)
331
332    def test_attachment_from_fs(self):
333        tmpdir = tempfile.mkdtemp()
334        tmpfile = os.path.join(tmpdir, 'test.txt')
335        f = open(tmpfile, 'w')
336        f.write('Hello!')
337        f.close()
338        doc = {}
339        self.db['foo'] = doc
340        with open(tmpfile) as f:
341            self.db.put_attachment(doc, f)
342        doc = self.db.get('foo')
343        self.assertTrue(doc['_attachments']['test.txt']['content_type'] == 'text/plain')
344        shutil.rmtree(tmpdir)
345
346    def test_attachment_no_filename(self):
347        doc = {}
348        self.db['foo'] = doc
349        self.assertRaises(ValueError, self.db.put_attachment, doc, '')
350
351    def test_json_attachment(self):
352        doc = {}
353        self.db['foo'] = doc
354        self.db.put_attachment(doc, '{}', 'test.json', 'application/json')
355        self.assertEqual(self.db.get_attachment(doc, 'test.json').read(), b'{}')
356
357    def test_include_docs(self):
358        doc = {'foo': 42, 'bar': 40}
359        self.db['foo'] = doc
360
361        rows = list(self.db.query(
362            'function(doc) { emit(doc._id, null); }',
363            include_docs=True
364        ))
365        self.assertEqual(1, len(rows))
366        self.assertEqual(doc, rows[0].doc)
367
368    def test_query_multi_get(self):
369        for i in range(1, 6):
370            self.db.save({'i': i})
371        res = list(self.db.query('function(doc) { emit(doc.i, null); }',
372                                 keys=list(range(1, 6, 2))))
373        self.assertEqual(3, len(res))
374        for idx, i in enumerate(range(1, 6, 2)):
375            self.assertEqual(i, res[idx].key)
376
377    def test_bulk_update_conflict(self):
378        docs = [
379            dict(type='Person', name='John Doe'),
380            dict(type='Person', name='Mary Jane'),
381            dict(type='City', name='Gotham City')
382        ]
383        self.db.update(docs)
384
385        # update the first doc to provoke a conflict in the next bulk update
386        doc = docs[0].copy()
387        self.db[doc['_id']] = doc
388
389        results = self.db.update(docs)
390        self.assertEqual(False, results[0][0])
391        assert isinstance(results[0][2], http.ResourceConflict)
392
393    def test_bulk_update_all_or_nothing(self):
394        docs = [
395            dict(type='Person', name='John Doe'),
396            dict(type='Person', name='Mary Jane'),
397            dict(type='City', name='Gotham City')
398        ]
399        self.db.update(docs)
400
401        # update the first doc to provoke a conflict in the next bulk update
402        doc = docs[0].copy()
403        doc['name'] = 'Jane Doe'
404        self.db[doc['_id']] = doc
405
406        results = self.db.update(docs, all_or_nothing=True)
407        self.assertEqual(True, results[0][0])
408
409        doc = self.db.get(doc['_id'], conflicts=True)
410        assert '_conflicts' in doc
411        revs = self.db.get(doc['_id'], open_revs='all')
412        assert len(revs) == 2
413
414    def test_bulk_update_bad_doc(self):
415        self.assertRaises(TypeError, self.db.update, [object()])
416
417    def test_copy_doc(self):
418        self.db['foo'] = {'status': 'testing'}
419        result = self.db.copy('foo', 'bar')
420        self.assertEqual(result, self.db['bar'].rev)
421
422    def test_copy_doc_conflict(self):
423        self.db['bar'] = {'status': 'idle'}
424        self.db['foo'] = {'status': 'testing'}
425        self.assertRaises(http.ResourceConflict, self.db.copy, 'foo', 'bar')
426
427    def test_copy_doc_overwrite(self):
428        self.db['bar'] = {'status': 'idle'}
429        self.db['foo'] = {'status': 'testing'}
430        result = self.db.copy('foo', self.db['bar'])
431        doc = self.db['bar']
432        self.assertEqual(result, doc.rev)
433        self.assertEqual('testing', doc['status'])
434
435    def test_copy_doc_srcobj(self):
436        self.db['foo'] = {'status': 'testing'}
437        self.db.copy(self.db['foo'], 'bar')
438        self.assertEqual('testing', self.db['bar']['status'])
439
440    def test_copy_doc_destobj_norev(self):
441        self.db['foo'] = {'status': 'testing'}
442        self.db.copy('foo', {'_id': 'bar'})
443        self.assertEqual('testing', self.db['bar']['status'])
444
445    def test_copy_doc_src_dictlike(self):
446        class DictLike(object):
447            def __init__(self, doc):
448                self.doc = doc
449            def items(self):
450                return self.doc.items()
451        self.db['foo'] = {'status': 'testing'}
452        self.db.copy(DictLike(self.db['foo']), 'bar')
453        self.assertEqual('testing', self.db['bar']['status'])
454
455    def test_copy_doc_dest_dictlike(self):
456        class DictLike(object):
457            def __init__(self, doc):
458                self.doc = doc
459            def items(self):
460                return self.doc.items()
461        self.db['foo'] = {'status': 'testing'}
462        self.db['bar'] = {}
463        self.db.copy('foo', DictLike(self.db['bar']))
464        self.assertEqual('testing', self.db['bar']['status'])
465
466    def test_copy_doc_src_baddoc(self):
467        self.assertRaises(TypeError, self.db.copy, object(), 'bar')
468
469    def test_copy_doc_dest_baddoc(self):
470        self.assertRaises(TypeError, self.db.copy, 'foo', object())
471
472    def test_changes(self):
473        self.db['foo'] = {'bar': True}
474        self.assertEqual(self.db.changes(since=0)['last_seq'], 1)
475        first = next(self.db.changes(feed='continuous'))
476        self.assertEqual(first['seq'], 1)
477        self.assertEqual(first['id'], 'foo')
478
479    def test_changes_releases_conn(self):
480        # Consume an entire changes feed to read the whole response, then check
481        # that the HTTP connection made it to the pool.
482        list(self.db.changes(feed='continuous', timeout=0))
483        scheme, netloc = util.urlsplit(client.DEFAULT_BASE_URL)[:2]
484        self.assertTrue(self.db.resource.session.connection_pool.conns[(scheme, netloc)])
485
486    def test_changes_releases_conn_when_lastseq(self):
487        # Consume a changes feed, stopping at the 'last_seq' item, i.e. don't
488        # let the generator run any further, then check the connection made it
489        # to the pool.
490        for obj in self.db.changes(feed='continuous', timeout=0):
491            if 'last_seq' in obj:
492                break
493        scheme, netloc = util.urlsplit(client.DEFAULT_BASE_URL)[:2]
494        self.assertTrue(self.db.resource.session.connection_pool.conns[(scheme, netloc)])
495
496    def test_changes_conn_usable(self):
497        # Consume a changes feed to get a used connection in the pool.
498        list(self.db.changes(feed='continuous', timeout=0))
499        # Try using the connection again to make sure the connection was left
500        # in a good state from the previous request.
501        self.assertTrue(self.db.info()['doc_count'] == 0)
502
503    def test_changes_heartbeat(self):
504        def wakeup():
505            time.sleep(.3)
506            self.db.save({})
507        threading.Thread(target=wakeup).start()
508        for change in self.db.changes(feed='continuous', heartbeat=100):
509            break
510
511    def test_purge(self):
512        doc = {'a': 'b'}
513        self.db['foo'] = doc
514        self.assertEqual(self.db.purge([doc])['purge_seq'], 1)
515
516    def test_json_encoding_error(self):
517        doc = {'now': datetime.now()}
518        self.assertRaises(TypeError, self.db.save, doc)
519
520    def test_security(self):
521        security = self.db.security
522        self.assertEqual(security, {})
523        security['members'] = {'names': ['test'], 'roles': []}
524        self.db.security = security
525
526
527class ViewTestCase(testutil.TempDatabaseMixin, unittest.TestCase):
528
529    def test_row_object(self):
530
531        row = list(self.db.view('_all_docs', keys=['blah']))[0]
532        self.assertEqual(row.id, None)
533        self.assertEqual(row.key, 'blah')
534        self.assertEqual(row.value, None)
535        self.assertEqual(row.error, 'not_found')
536
537        self.db.save({'_id': 'xyz', 'foo': 'bar'})
538        row = list(self.db.view('_all_docs', keys=['xyz']))[0]
539        self.assertEqual(row.id, 'xyz')
540        self.assertEqual(row.key, 'xyz')
541        self.assertEqual(list(row.value.keys()), ['rev'])
542        self.assertEqual(row.error, None)
543
544    def test_view_multi_get(self):
545        for i in range(1, 6):
546            self.db.save({'i': i})
547        self.db['_design/test'] = {
548            'language': 'javascript',
549            'views': {
550                'multi_key': {'map': 'function(doc) { emit(doc.i, null); }'}
551            }
552        }
553
554        res = list(self.db.view('test/multi_key', keys=list(range(1, 6, 2))))
555        self.assertEqual(3, len(res))
556        for idx, i in enumerate(range(1, 6, 2)):
557            self.assertEqual(i, res[idx].key)
558
559    def test_ddoc_info(self):
560        self.db['_design/test'] = {
561            'language': 'javascript',
562            'views': {
563                'test': {'map': 'function(doc) { emit(doc.type, null); }'}
564            }
565        }
566        info = self.db.info('test')
567        self.assertEqual(info['view_index']['compact_running'], False)
568
569    def test_view_compaction(self):
570        for i in range(1, 6):
571            self.db.save({'i': i})
572        self.db['_design/test'] = {
573            'language': 'javascript',
574            'views': {
575                'multi_key': {'map': 'function(doc) { emit(doc.i, null); }'}
576            }
577        }
578
579        self.db.view('test/multi_key')
580        self.assertTrue(self.db.compact('test'))
581
582    def test_view_cleanup(self):
583
584        for i in range(1, 6):
585            self.db.save({'i': i})
586
587        self.db['_design/test'] = {
588            'language': 'javascript',
589            'views': {
590                'multi_key': {'map': 'function(doc) { emit(doc.i, null); }'}
591            }
592        }
593        self.db.view('test/multi_key')
594
595        ddoc = self.db['_design/test']
596        ddoc['views'] = {
597            'ids': {'map': 'function(doc) { emit(doc._id, null); }'}
598        }
599        self.db.update([ddoc])
600        self.db.view('test/ids')
601        self.assertTrue(self.db.cleanup())
602
603    def test_view_function_objects(self):
604        if 'python' not in self.server.config()['query_servers']:
605            return
606
607        for i in range(1, 4):
608            self.db.save({'i': i, 'j':2*i})
609
610        def map_fun(doc):
611            yield doc['i'], doc['j']
612        res = list(self.db.query(map_fun, language='python'))
613        self.assertEqual(3, len(res))
614        for idx, i in enumerate(range(1,4)):
615            self.assertEqual(i, res[idx].key)
616            self.assertEqual(2*i, res[idx].value)
617
618        def reduce_fun(keys, values):
619            return sum(values)
620        res = list(self.db.query(map_fun, reduce_fun, 'python'))
621        self.assertEqual(1, len(res))
622        self.assertEqual(12, res[0].value)
623
624    def test_init_with_resource(self):
625        self.db['foo'] = {}
626        view = client.PermanentView(self.db.resource('_all_docs').url, '_all_docs')
627        self.assertEqual(len(list(view())), 1)
628
629    def test_iter_view(self):
630        self.db['foo'] = {}
631        view = client.PermanentView(self.db.resource('_all_docs').url, '_all_docs')
632        self.assertEqual(len(list(view)), 1)
633
634    def test_update_seq(self):
635        self.db['foo'] = {}
636        rows = self.db.view('_all_docs', update_seq=True)
637        self.assertEqual(rows.update_seq, 1)
638
639    def test_tmpview_repr(self):
640        mapfunc = "function(doc) {emit(null, null);}"
641        view = client.TemporaryView(self.db.resource('_temp_view'), mapfunc)
642        self.assertTrue('TemporaryView' in repr(view))
643        self.assertTrue(mapfunc in repr(view))
644
645    def test_wrapper_iter(self):
646        class Wrapper(object):
647            def __init__(self, doc):
648                pass
649        self.db['foo'] = {}
650        self.assertTrue(isinstance(list(self.db.view('_all_docs', wrapper=Wrapper))[0], Wrapper))
651
652    def test_wrapper_rows(self):
653        class Wrapper(object):
654            def __init__(self, doc):
655                pass
656        self.db['foo'] = {}
657        self.assertTrue(isinstance(self.db.view('_all_docs', wrapper=Wrapper).rows[0], Wrapper))
658
659    def test_properties(self):
660        for attr in ['rows', 'total_rows', 'offset']:
661            self.assertTrue(getattr(self.db.view('_all_docs'), attr) is not None)
662
663    def test_rowrepr(self):
664        self.db['foo'] = {}
665        rows = list(self.db.query("function(doc) {emit(null, 1);}"))
666        self.assertTrue('Row' in repr(rows[0]))
667        self.assertTrue('id' in repr(rows[0]))
668        rows = list(self.db.query("function(doc) {emit(null, 1);}", "function(keys, values, combine) {return sum(values);}"))
669        self.assertTrue('Row' in repr(rows[0]))
670        self.assertTrue('id' not in repr(rows[0]))
671
672
673class ShowListTestCase(testutil.TempDatabaseMixin, unittest.TestCase):
674
675    show_func = """
676        function(doc, req) {
677            return {"body": req.id + ":" + (req.query.r || "<default>")};
678        }
679        """
680
681    list_func = """
682        function(head, req) {
683            start({headers: {'Content-Type': 'text/csv'}});
684            if (req.query.include_header) {
685                send('id' + '\\r\\n');
686            }
687            var row;
688            while (row = getRow()) {
689                send(row.id + '\\r\\n');
690            }
691        }
692        """
693
694    design_doc = {'_id': '_design/foo',
695                  'shows': {'bar': show_func},
696                  'views': {'by_id': {'map': "function(doc) {emit(doc._id, null)}"},
697                            'by_name': {'map': "function(doc) {emit(doc.name, null)}"}},
698                  'lists': {'list': list_func}}
699
700    def setUp(self):
701        super(ShowListTestCase, self).setUp()
702        # Workaround for possible bug in CouchDB. Adding a timestamp avoids a
703        # 409 Conflict error when pushing the same design doc that existed in a
704        # now deleted database.
705        design_doc = dict(self.design_doc)
706        design_doc['timestamp'] = time.time()
707        self.db.save(design_doc)
708        self.db.update([{'_id': '1', 'name': 'one'}, {'_id': '2', 'name': 'two'}])
709
710    def test_show_urls(self):
711        self.assertEqual(self.db.show('_design/foo/_show/bar')[1].read(), b'null:<default>')
712        self.assertEqual(self.db.show('foo/bar')[1].read(), b'null:<default>')
713
714    def test_show_docid(self):
715        self.assertEqual(self.db.show('foo/bar')[1].read(), b'null:<default>')
716        self.assertEqual(self.db.show('foo/bar', '1')[1].read(), b'1:<default>')
717        self.assertEqual(self.db.show('foo/bar', '2')[1].read(), b'2:<default>')
718
719    def test_show_params(self):
720        self.assertEqual(self.db.show('foo/bar', r='abc')[1].read(), b'null:abc')
721
722    def test_list(self):
723        self.assertEqual(self.db.list('foo/list', 'foo/by_id')[1].read(), b'1\r\n2\r\n')
724        self.assertEqual(self.db.list('foo/list', 'foo/by_id', include_header='true')[1].read(), b'id\r\n1\r\n2\r\n')
725
726    def test_list_keys(self):
727        self.assertEqual(self.db.list('foo/list', 'foo/by_id', keys=['1'])[1].read(), b'1\r\n')
728
729    def test_list_view_params(self):
730        self.assertEqual(self.db.list('foo/list', 'foo/by_name', startkey='o', endkey='p')[1].read(), b'1\r\n')
731        self.assertEqual(self.db.list('foo/list', 'foo/by_name', descending=True)[1].read(), b'2\r\n1\r\n')
732
733
734class UpdateHandlerTestCase(testutil.TempDatabaseMixin, unittest.TestCase):
735    update_func = """
736        function(doc, req) {
737          if (!doc) {
738            if (req.id) {
739              return [{_id : req.id}, "new doc"]
740            }
741            return [null, "empty doc"];
742          }
743          doc.name = "hello";
744          return [doc, "hello doc"];
745        }
746    """
747
748    design_doc = {'_id': '_design/foo',
749                  'language': 'javascript',
750                  'updates': {'bar': update_func}}
751
752    def setUp(self):
753        super(UpdateHandlerTestCase, self).setUp()
754        # Workaround for possible bug in CouchDB. Adding a timestamp avoids a
755        # 409 Conflict error when pushing the same design doc that existed in a
756        # now deleted database.
757        design_doc = dict(self.design_doc)
758        design_doc['timestamp'] = time.time()
759        self.db.save(design_doc)
760        self.db.update([{'_id': 'existed', 'name': 'bar'}])
761
762    def test_empty_doc(self):
763        self.assertEqual(self.db.update_doc('foo/bar')[1].read(), b'empty doc')
764
765    def test_new_doc(self):
766        self.assertEqual(self.db.update_doc('foo/bar', 'new')[1].read(), b'new doc')
767
768    def test_update_doc(self):
769        self.assertEqual(self.db.update_doc('foo/bar', 'existed')[1].read(), b'hello doc')
770
771
772class ViewIterationTestCase(testutil.TempDatabaseMixin, unittest.TestCase):
773
774    num_docs = 100
775
776    def docfromnum(self, num):
777        return {'_id': util.utype(num), 'num': int(num / 2)}
778
779    def docfromrow(self, row):
780        return {'_id': row['id'], 'num': row['key']}
781
782    def setUp(self):
783        super(ViewIterationTestCase, self).setUp()
784        design_doc = {'_id': '_design/test',
785                      'views': {'nums': {'map': 'function(doc) {emit(doc.num, null);}'},
786                                'nulls': {'map': 'function(doc) {emit(null, null);}'}}}
787        self.db.save(design_doc)
788        self.db.update([self.docfromnum(num) for num in range(self.num_docs)])
789
790    def test_allrows(self):
791        rows = list(self.db.iterview('test/nums', 10))
792        self.assertEqual(len(rows), self.num_docs)
793        self.assertEqual([self.docfromrow(row) for row in rows],
794                         [self.docfromnum(num) for num in range(self.num_docs)])
795
796    def test_batchsizes(self):
797        # Check silly _batch values.
798        self.assertRaises(ValueError, lambda: next(self.db.iterview('test/nums', 0)))
799        self.assertRaises(ValueError, lambda: next(self.db.iterview('test/nums', -1)))
800        # Test various _batch sizes that are likely to cause trouble.
801        self.assertEqual(len(list(self.db.iterview('test/nums', 1))), self.num_docs)
802        self.assertEqual(len(list(self.db.iterview('test/nums', int(self.num_docs / 2)))), self.num_docs)
803        self.assertEqual(len(list(self.db.iterview('test/nums', self.num_docs * 2))), self.num_docs)
804        self.assertEqual(len(list(self.db.iterview('test/nums', self.num_docs - 1))), self.num_docs)
805        self.assertEqual(len(list(self.db.iterview('test/nums', self.num_docs))), self.num_docs)
806        self.assertEqual(len(list(self.db.iterview('test/nums', self.num_docs + 1))), self.num_docs)
807
808    def test_batchsizes_with_skip(self):
809        self.assertEqual(
810            len(list(self.db.iterview('test/nums', self.num_docs // 10, skip=self.num_docs // 2))),
811            self.num_docs // 2)
812
813    def test_limit(self):
814        # limit=0 doesn't make sense for iterview.
815        self.assertRaises(ValueError, lambda: next(self.db.iterview('test/nums', 10, limit=0)))
816        # Test various limit sizes that are likely to cause trouble.
817        for limit in [1, int(self.num_docs / 4), self.num_docs - 1, self.num_docs,
818                      self.num_docs + 1]:
819            self.assertEqual([self.docfromrow(doc) for doc in self.db.iterview('test/nums', 10, limit=limit)],
820                             [self.docfromnum(x) for x in range(min(limit, self.num_docs))])
821        # Test limit same as batch size, in case of weird edge cases.
822        limit = int(self.num_docs / 4)
823        self.assertEqual([self.docfromrow(doc) for doc in self.db.iterview('test/nums', limit, limit=limit)],
824                         [self.docfromnum(x) for x in range(limit)])
825
826    def test_descending(self):
827        self.assertEqual([self.docfromrow(doc) for doc in self.db.iterview('test/nums', 10, descending=True)],
828                         [self.docfromnum(x) for x in range(self.num_docs - 1, -1, -1)])
829        self.assertEqual([self.docfromrow(doc) for doc in self.db.iterview('test/nums', 10, limit=int(self.num_docs / 4), descending=True)],
830                         [self.docfromnum(x) for x in range(self.num_docs - 1, int(self.num_docs * 3 / 4) - 1, -1)])
831
832    def test_startkey(self):
833        self.assertEqual([self.docfromrow(doc) for doc in self.db.iterview('test/nums', 10, startkey=int(self.num_docs / 2) - 1)],
834                         [self.docfromnum(x) for x in range(self.num_docs - 2, self.num_docs)])
835        self.assertEqual([self.docfromrow(doc) for doc in self.db.iterview('test/nums', 10, startkey=1, descending=True)],
836                         [self.docfromnum(x) for x in range(3, -1, -1)])
837
838    def test_nullkeys(self):
839        self.assertEqual(len(list(self.db.iterview('test/nulls', 10))), self.num_docs)
840
841def suite():
842    suite = unittest.TestSuite()
843    suite.addTest(unittest.makeSuite(ServerTestCase, 'test'))
844    suite.addTest(unittest.makeSuite(DatabaseTestCase, 'test'))
845    suite.addTest(unittest.makeSuite(ViewTestCase, 'test'))
846    suite.addTest(unittest.makeSuite(ShowListTestCase, 'test'))
847    suite.addTest(unittest.makeSuite(UpdateHandlerTestCase, 'test'))
848    suite.addTest(unittest.makeSuite(ViewIterationTestCase, 'test'))
849    suite.addTest(testutil.doctest_suite(client))
850    return suite
851
852
853if __name__ == '__main__':
854    unittest.main(defaultTest='suite')
855