1import os
2import subprocess
3import sys
4
5import six
6
7from tests.utils import snapshot
8
9
10code = """
11import datetime
12import os
13
14import %s as elasticsearch
15
16ELASTICSEARCH_CONFIG = {"port": int(os.getenv("TEST_ELASTICSEARCH_PORT", 9200)),}
17ES_INDEX = "ddtrace_index"
18ES_TYPE = "ddtrace_type"
19mapping = {"mapping": {"properties": {"created": {"type": "date", "format": "yyyy-MM-dd"}}}}
20es = elasticsearch.Elasticsearch(port=ELASTICSEARCH_CONFIG["port"])
21es.indices.create(index=ES_INDEX, ignore=400, body=mapping)
22
23args = {"index": ES_INDEX, "doc_type": ES_TYPE}
24es.indices.delete(index=ES_INDEX, ignore=[400, 404])
25"""
26
27
28def do_test(tmpdir, es_version):
29    f = tmpdir.join("test.py")
30    f.write(code % es_version)
31    env = os.environ.copy()
32    # ddtrace-run patches sqlite3 which is used by coverage to store coverage
33    # results. This generates sqlite3 spans during the test run which interfere
34    # with the snapshot. So disable sqlite3.
35    env.update({"DD_TRACE_SQLITE3_ENABLED": "false"})
36    p = subprocess.Popen(
37        ["ddtrace-run", sys.executable, "test.py"],
38        stdout=subprocess.PIPE,
39        stderr=subprocess.PIPE,
40        cwd=str(tmpdir),
41        env=env,
42    )
43    p.wait()
44    stderr = p.stderr.read()
45    stdout = p.stdout.read()
46    assert stderr == six.b(""), stderr
47    assert stdout == six.b(""), stdout
48    assert p.returncode == 0
49
50
51@snapshot(async_mode=False)
52def test_elasticsearch(tmpdir):
53    do_test(tmpdir, "elasticsearch")
54
55
56@snapshot(async_mode=False)
57def test_elasticsearch2(tmpdir):
58    do_test(tmpdir, "elasticsearch2")
59
60
61@snapshot(async_mode=False)
62def test_elasticsearch5(tmpdir):
63    do_test(tmpdir, "elasticsearch5")
64
65
66@snapshot(async_mode=False)
67def test_elasticsearch6(tmpdir):
68    do_test(tmpdir, "elasticsearch6")
69
70
71@snapshot(async_mode=False)
72def test_elasticsearch7(tmpdir):
73    do_test(tmpdir, "elasticsearch7")
74