1# Copyright (c) 2015 RIPE NCC
2#
3# This program is free software: you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation, either version 3 of the License, or
6# (at your option) any later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11# GNU General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License
14# along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
16import os
17import unittest
18from datetime import datetime, timedelta
19from collections import namedtuple
20
21from nose.exc import SkipTest
22
23
24from ripe.atlas.cousteau import (
25    AtlasSource, AtlasChangeSource,
26    AtlasRequest, AtlasCreateRequest, AtlasChangeRequest,
27    Ping, Dns, AtlasStopRequest, AtlasResultsRequest,
28    ProbeRequest, MeasurementRequest, Probe, Measurement,
29    AtlasStream, Ntp, Sslcert, Http, Traceroute, AnchorRequest
30)
31
32
33class TestRealServer(unittest.TestCase):
34    def setUp(self):
35        self.server = os.environ.get('ATLAS_SERVER', "")
36        self.create_key = os.environ.get('CREATE_KEY', "")
37        self.change_key = os.environ.get('CHANGE_KEY', "")
38        self.delete_key = os.environ.get('DELETE_KEY', "")
39        self.delete_msm = None
40
41    def test_create_delete_request(self):
42        """Unittest for Atlas create and delete request"""
43        if self.server == "":
44            raise SkipTest
45        source = AtlasSource(**{"type": "area", "value": "WW", "requested": 38})
46        ping = Ping(**{
47            "target": "www.ripe.net",
48            "af": 4,
49            "description": "Cousteau testing",
50            "prefer_anchors": True
51        })
52        traceroute = Traceroute(**{
53            "target": "www.google.fr",
54            "af": 4, "protocol": "UDP",
55            "description": "Cousteau testing",
56            "dont_fragment": True
57        })
58        dns = Dns(**{
59            "target": "k.root-servers.net", "af": 4,
60            "description": "Cousteau testing", "query_type": "SOA",
61            "query_class": "IN", "query_argument": "nl", "retry": 6
62        })
63        ntp = Ntp(**{
64            "target": "www.ripe.net",
65            "af": 4,
66            "description": "Cousteau testing",
67            "timeout": 1000
68        })
69        ssl = Sslcert(**{
70            "target": "www.ripe.net",
71            "af": 4,
72            "description": "Cousteau testing",
73        })
74        http = Http(**{
75            "target": "www.ripe.net",
76            "af": 4,
77            "description": "Cousteau testing",
78        })
79        stop = datetime.utcnow() + timedelta(minutes=220)
80        request = AtlasCreateRequest(
81            **{
82                "verify": False,
83                "stop_time": stop,
84                "key": self.create_key,
85                "server": self.server,
86                "measurements": [ping, traceroute, dns, ntp, ssl, http],
87                "sources": [source]
88            }
89        )
90        result = namedtuple('Result', 'success response')
91        (result.success, result.response) = request.create()
92        print(result.response)
93        self.assertTrue(result.success)
94        self.delete_msm = result.response["measurements"][0]
95        self.assertTrue(result.success)
96
97        # Unittest for Atlas delete request
98        if self.server == "":
99            raise SkipTest
100
101        kwargs = {"verify": False, "msm_id": self.delete_msm, "key": self.delete_key, "server": self.server}
102        request = AtlasStopRequest(**kwargs)
103        result = namedtuple('Result', 'success response')
104        (result.success, result.response) = request.create()
105        print(result.response)
106        self.assertTrue(result.success)
107
108    def test_change_request(self):
109        """Unittest for Atlas change request"""
110        if self.server == "":
111            raise SkipTest
112
113        remove = AtlasChangeSource(**{
114            "value": "6001", "requested": 1, "action": "remove", "type": "probes"
115        })
116        add = AtlasChangeSource(**{
117            "value": "6002", "requested": 1, "action": "add", "type": "probes"
118        })
119        request = AtlasChangeRequest(
120            **{
121                "key": self.change_key,
122                "verify": False,
123                "msm_id": 1000032,
124                "server": self.server,
125                "sources": [add, remove]
126            }
127        )
128        result = namedtuple('Result', 'success response')
129        (result.success, result.response) = request.create()
130        print(result.response)
131        self.assertTrue(result.success)
132
133    def test_result_request(self):
134        """Unittest for Atlas results request"""
135        if self.server == "":
136            raise SkipTest
137
138        kwargs = {
139            "msm_id": 1000032,
140            "start": datetime(2011, 11, 21),
141            "stop": datetime(2011, 11, 22),
142            "verify": False,
143            "probe_ids": [743, 630]
144        }
145
146        result = namedtuple('Result', 'success response')
147        (result.success, result.response) = AtlasResultsRequest(**kwargs).create()
148        print(result.success, result.response)
149        self.assertTrue(result.response)
150        self.assertTrue(result.success)
151
152    def test_probe_request(self):
153        """Unittest for ProbeRequest"""
154        if self.server == "":
155            raise SkipTest
156
157        filters = {"tags": "NAT", "country_code": "NL", "asn_v4": "3333"}
158        probes = ProbeRequest(**filters)
159        probes_list = list(probes)
160        self.assertTrue(probes_list)
161        self.assertTrue(probes.total_count)
162
163    def test_measurement_request(self):
164        """Unittest for MeasurementRequest"""
165        if self.server == "":
166            raise SkipTest
167
168        filters = {"id__lt": 1000010, "id__gt": 1000002}
169        measurements = MeasurementRequest(**filters)
170        measurements_list = list(measurements)
171        self.assertTrue(measurements_list)
172        self.assertTrue(measurements.total_count)
173
174    def test_anchor_request(self):
175        """Unittest for AnchorRequest"""
176        if self.server == "":
177            raise SkipTest
178
179        anchors = AnchorRequest()
180        anchors_list = list(anchors)
181        self.assertTrue(anchors_list)
182        self.assertTrue(anchors.total_count)
183
184    def test_probe_repr_request(self):
185        """Unittest for Probe representation request"""
186        if self.server == "":
187            raise SkipTest
188
189        Probe(id=6001)
190
191    def test_measurement_repr_request(self):
192        """Unittest for Measurement representation request"""
193        if self.server == "":
194            raise SkipTest
195
196        Measurement(id=1000032, server=self.server, verify=False)
197
198    def test_stream_results(self):
199        """Unittest for Atlas results request."""
200        if self.server == "":
201            raise SkipTest
202
203        results = []
204
205        def on_result_response(*args):
206            """
207            Function that will be called every time we receive a new result.
208            Args is a tuple, so you should use args[0] to access the real message.
209            """
210            results.append(args[0])
211
212        atlas_stream = AtlasStream()
213        atlas_stream.connect()
214        channel = "result"
215        atlas_stream.bind_channel(channel, on_result_response)
216        stream_parameters = {"msm": 1001}
217        atlas_stream.start_stream(stream_type="result", **stream_parameters)
218        atlas_stream.timeout(seconds=5)
219        atlas_stream.disconnect()
220        self.assertNotEqual(results, [])
221
222    def test_stream_probe(self):
223        """Unittest for Atlas probe connections request."""
224        if self.server == "":
225            raise SkipTest
226
227        results = []
228
229        def on_result_response(*args):
230            """
231            Function that will be called every time we receive a new event.
232            Args is a tuple, so you should use args[0] to access the real message.
233            """
234            results.append(args[0])
235
236        atlas_stream = AtlasStream()
237        atlas_stream.connect()
238        channel = "atlas_probestatus"
239        atlas_stream.bind_channel(channel, on_result_response)
240        stream_parameters = {"enrichProbes": True}
241        atlas_stream.start_stream(stream_type="probestatus", **stream_parameters)
242        atlas_stream.timeout(seconds=30)
243        atlas_stream.disconnect()
244        self.assertNotEqual(results, [])
245
246    def test_get_request(self):
247        """Unittest for Atlas get request"""
248        if self.server == "":
249            raise SkipTest
250
251        request = AtlasRequest(
252            **{
253                "verify": False,
254                "url_path": "/api/v2/anchors"
255            }
256        )
257        result = namedtuple('Result', 'success response')
258        (result.success, result.response) = request.get()
259        print(result.success, result.response)
260        self.assertTrue(result.response["results"])
261        self.assertTrue(result.success)
262