1# Copyright (c) 2015 RIPE NCC 2# 3# This program is free software: you can redistribute it and/or modify 4# it under the terms of the GNU General Public License as published by 5# the Free Software Foundation, either version 3 of the License, or 6# (at your option) any later version. 7# 8# This program is distributed in the hope that it will be useful, 9# but WITHOUT ANY WARRANTY; without even the implied warranty of 10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11# GNU General Public License for more details. 12# 13# You should have received a copy of the GNU General Public License 14# along with this program. If not, see <http://www.gnu.org/licenses/>. 15 16import os 17import unittest 18from datetime import datetime, timedelta 19from collections import namedtuple 20 21from nose.exc import SkipTest 22 23 24from ripe.atlas.cousteau import ( 25 AtlasSource, AtlasChangeSource, 26 AtlasRequest, AtlasCreateRequest, AtlasChangeRequest, 27 Ping, Dns, AtlasStopRequest, AtlasResultsRequest, 28 ProbeRequest, MeasurementRequest, Probe, Measurement, 29 AtlasStream, Ntp, Sslcert, Http, Traceroute, AnchorRequest 30) 31 32 33class TestRealServer(unittest.TestCase): 34 def setUp(self): 35 self.server = os.environ.get('ATLAS_SERVER', "") 36 self.create_key = os.environ.get('CREATE_KEY', "") 37 self.change_key = os.environ.get('CHANGE_KEY', "") 38 self.delete_key = os.environ.get('DELETE_KEY', "") 39 self.delete_msm = None 40 41 def test_create_delete_request(self): 42 """Unittest for Atlas create and delete request""" 43 if self.server == "": 44 raise SkipTest 45 source = AtlasSource(**{"type": "area", "value": "WW", "requested": 38}) 46 ping = Ping(**{ 47 "target": "www.ripe.net", 48 "af": 4, 49 "description": "Cousteau testing", 50 "prefer_anchors": True 51 }) 52 traceroute = Traceroute(**{ 53 "target": "www.google.fr", 54 "af": 4, "protocol": "UDP", 55 "description": "Cousteau testing", 56 "dont_fragment": True 57 }) 58 dns = Dns(**{ 59 "target": "k.root-servers.net", "af": 4, 60 "description": "Cousteau testing", "query_type": "SOA", 61 "query_class": "IN", "query_argument": "nl", "retry": 6 62 }) 63 ntp = Ntp(**{ 64 "target": "www.ripe.net", 65 "af": 4, 66 "description": "Cousteau testing", 67 "timeout": 1000 68 }) 69 ssl = Sslcert(**{ 70 "target": "www.ripe.net", 71 "af": 4, 72 "description": "Cousteau testing", 73 }) 74 http = Http(**{ 75 "target": "www.ripe.net", 76 "af": 4, 77 "description": "Cousteau testing", 78 }) 79 stop = datetime.utcnow() + timedelta(minutes=220) 80 request = AtlasCreateRequest( 81 **{ 82 "verify": False, 83 "stop_time": stop, 84 "key": self.create_key, 85 "server": self.server, 86 "measurements": [ping, traceroute, dns, ntp, ssl, http], 87 "sources": [source] 88 } 89 ) 90 result = namedtuple('Result', 'success response') 91 (result.success, result.response) = request.create() 92 print(result.response) 93 self.assertTrue(result.success) 94 self.delete_msm = result.response["measurements"][0] 95 self.assertTrue(result.success) 96 97 # Unittest for Atlas delete request 98 if self.server == "": 99 raise SkipTest 100 101 kwargs = {"verify": False, "msm_id": self.delete_msm, "key": self.delete_key, "server": self.server} 102 request = AtlasStopRequest(**kwargs) 103 result = namedtuple('Result', 'success response') 104 (result.success, result.response) = request.create() 105 print(result.response) 106 self.assertTrue(result.success) 107 108 def test_change_request(self): 109 """Unittest for Atlas change request""" 110 if self.server == "": 111 raise SkipTest 112 113 remove = AtlasChangeSource(**{ 114 "value": "6001", "requested": 1, "action": "remove", "type": "probes" 115 }) 116 add = AtlasChangeSource(**{ 117 "value": "6002", "requested": 1, "action": "add", "type": "probes" 118 }) 119 request = AtlasChangeRequest( 120 **{ 121 "key": self.change_key, 122 "verify": False, 123 "msm_id": 1000032, 124 "server": self.server, 125 "sources": [add, remove] 126 } 127 ) 128 result = namedtuple('Result', 'success response') 129 (result.success, result.response) = request.create() 130 print(result.response) 131 self.assertTrue(result.success) 132 133 def test_result_request(self): 134 """Unittest for Atlas results request""" 135 if self.server == "": 136 raise SkipTest 137 138 kwargs = { 139 "msm_id": 1000032, 140 "start": datetime(2011, 11, 21), 141 "stop": datetime(2011, 11, 22), 142 "verify": False, 143 "probe_ids": [743, 630] 144 } 145 146 result = namedtuple('Result', 'success response') 147 (result.success, result.response) = AtlasResultsRequest(**kwargs).create() 148 print(result.success, result.response) 149 self.assertTrue(result.response) 150 self.assertTrue(result.success) 151 152 def test_probe_request(self): 153 """Unittest for ProbeRequest""" 154 if self.server == "": 155 raise SkipTest 156 157 filters = {"tags": "NAT", "country_code": "NL", "asn_v4": "3333"} 158 probes = ProbeRequest(**filters) 159 probes_list = list(probes) 160 self.assertTrue(probes_list) 161 self.assertTrue(probes.total_count) 162 163 def test_measurement_request(self): 164 """Unittest for MeasurementRequest""" 165 if self.server == "": 166 raise SkipTest 167 168 filters = {"id__lt": 1000010, "id__gt": 1000002} 169 measurements = MeasurementRequest(**filters) 170 measurements_list = list(measurements) 171 self.assertTrue(measurements_list) 172 self.assertTrue(measurements.total_count) 173 174 def test_anchor_request(self): 175 """Unittest for AnchorRequest""" 176 if self.server == "": 177 raise SkipTest 178 179 anchors = AnchorRequest() 180 anchors_list = list(anchors) 181 self.assertTrue(anchors_list) 182 self.assertTrue(anchors.total_count) 183 184 def test_probe_repr_request(self): 185 """Unittest for Probe representation request""" 186 if self.server == "": 187 raise SkipTest 188 189 Probe(id=6001) 190 191 def test_measurement_repr_request(self): 192 """Unittest for Measurement representation request""" 193 if self.server == "": 194 raise SkipTest 195 196 Measurement(id=1000032, server=self.server, verify=False) 197 198 def test_stream_results(self): 199 """Unittest for Atlas results request.""" 200 if self.server == "": 201 raise SkipTest 202 203 results = [] 204 205 def on_result_response(*args): 206 """ 207 Function that will be called every time we receive a new result. 208 Args is a tuple, so you should use args[0] to access the real message. 209 """ 210 results.append(args[0]) 211 212 atlas_stream = AtlasStream() 213 atlas_stream.connect() 214 channel = "result" 215 atlas_stream.bind_channel(channel, on_result_response) 216 stream_parameters = {"msm": 1001} 217 atlas_stream.start_stream(stream_type="result", **stream_parameters) 218 atlas_stream.timeout(seconds=5) 219 atlas_stream.disconnect() 220 self.assertNotEqual(results, []) 221 222 def test_stream_probe(self): 223 """Unittest for Atlas probe connections request.""" 224 if self.server == "": 225 raise SkipTest 226 227 results = [] 228 229 def on_result_response(*args): 230 """ 231 Function that will be called every time we receive a new event. 232 Args is a tuple, so you should use args[0] to access the real message. 233 """ 234 results.append(args[0]) 235 236 atlas_stream = AtlasStream() 237 atlas_stream.connect() 238 channel = "atlas_probestatus" 239 atlas_stream.bind_channel(channel, on_result_response) 240 stream_parameters = {"enrichProbes": True} 241 atlas_stream.start_stream(stream_type="probestatus", **stream_parameters) 242 atlas_stream.timeout(seconds=30) 243 atlas_stream.disconnect() 244 self.assertNotEqual(results, []) 245 246 def test_get_request(self): 247 """Unittest for Atlas get request""" 248 if self.server == "": 249 raise SkipTest 250 251 request = AtlasRequest( 252 **{ 253 "verify": False, 254 "url_path": "/api/v2/anchors" 255 } 256 ) 257 result = namedtuple('Result', 'success response') 258 (result.success, result.response) = request.get() 259 print(result.success, result.response) 260 self.assertTrue(result.response["results"]) 261 self.assertTrue(result.success) 262