1import os 2import subprocess 3import sys 4 5import six 6 7from tests.utils import snapshot 8 9 10code = """ 11import datetime 12import os 13 14import %s as elasticsearch 15 16ELASTICSEARCH_CONFIG = {"port": int(os.getenv("TEST_ELASTICSEARCH_PORT", 9200)),} 17ES_INDEX = "ddtrace_index" 18ES_TYPE = "ddtrace_type" 19mapping = {"mapping": {"properties": {"created": {"type": "date", "format": "yyyy-MM-dd"}}}} 20es = elasticsearch.Elasticsearch(port=ELASTICSEARCH_CONFIG["port"]) 21es.indices.create(index=ES_INDEX, ignore=400, body=mapping) 22 23args = {"index": ES_INDEX, "doc_type": ES_TYPE} 24es.indices.delete(index=ES_INDEX, ignore=[400, 404]) 25""" 26 27 28def do_test(tmpdir, es_version): 29 f = tmpdir.join("test.py") 30 f.write(code % es_version) 31 env = os.environ.copy() 32 # ddtrace-run patches sqlite3 which is used by coverage to store coverage 33 # results. This generates sqlite3 spans during the test run which interfere 34 # with the snapshot. So disable sqlite3. 35 env.update({"DD_TRACE_SQLITE3_ENABLED": "false"}) 36 p = subprocess.Popen( 37 ["ddtrace-run", sys.executable, "test.py"], 38 stdout=subprocess.PIPE, 39 stderr=subprocess.PIPE, 40 cwd=str(tmpdir), 41 env=env, 42 ) 43 p.wait() 44 stderr = p.stderr.read() 45 stdout = p.stdout.read() 46 assert stderr == six.b(""), stderr 47 assert stdout == six.b(""), stdout 48 assert p.returncode == 0 49 50 51@snapshot(async_mode=False) 52def test_elasticsearch(tmpdir): 53 do_test(tmpdir, "elasticsearch") 54 55 56@snapshot(async_mode=False) 57def test_elasticsearch2(tmpdir): 58 do_test(tmpdir, "elasticsearch2") 59 60 61@snapshot(async_mode=False) 62def test_elasticsearch5(tmpdir): 63 do_test(tmpdir, "elasticsearch5") 64 65 66@snapshot(async_mode=False) 67def test_elasticsearch6(tmpdir): 68 do_test(tmpdir, "elasticsearch6") 69 70 71@snapshot(async_mode=False) 72def test_elasticsearch7(tmpdir): 73 do_test(tmpdir, "elasticsearch7") 74