xref: /openbsd/regress/sbin/slaacd/sniff_sol.py (revision a6e9806d)
1#!/usr/local/bin/python3
2# $OpenBSD: sniff_sol.py,v 1.2 2020/12/25 14:25:58 bluhm Exp $
3
4# Copyright (c) 2017 Florian Obser <florian@openbsd.org>
5#
6# Permission to use, copy, modify, and distribute this software for any
7# purpose with or without fee is hereby granted, provided that the above
8# copyright notice and this permission notice appear in all copies.
9#
10# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17
18import subprocess
19import threading
20from scapy.all import *
21
22class Sniffer(threading.Thread):
23	executor = None
24	p = None
25	# pcap cannot access icmp6, type 133 = router solicitation
26	filter = "icmp6 and ip6[40]=133"
27	def run(self):
28		answer = sniff(iface='pair1', count=1, timeout=30,
29		    filter=self.filter)
30		executor.stopit()
31		if answer:
32			self.p = answer[0]
33
34class Executor(threading.Thread):
35	event = threading.Event()
36	def stopit(self):
37		self.event.set()
38
39	def run(self):
40		for sols in range(0, 30):
41			subprocess.call(['slaacctl', '-s', sys.argv[1],
42			    'send', 'sol', 'pair2'])
43			if self.event.wait(1):
44				break
45
46sniffer = Sniffer()
47executor = Executor()
48
49sniffer.executor = executor
50
51sniffer.start()
52executor.start()
53
54sniffer.join(timeout=30)
55executor.join(timeout=30)
56
57p = sniffer.p
58
59if p is None:
60	print("no packet sniffed")
61	exit(2)
62
63if p.type != ETH_P_IPV6:
64	print("unexpected ethertype: {0}".format(p.type))
65	exit(1)
66
67if not p.payload.nh in ipv6nh or ipv6nh[p.payload.nh] != 'ICMPv6':
68	print("unexpected next header: {0}".format(p.payload.nh))
69	exit(1)
70
71if p[IPv6].hlim != 255:
72	print("invalid hlim: {0}".format(p[IPv6].hlim))
73	exit(1)
74
75if p[IPv6].dst != 'ff02::2':
76	print("invalid IPv6 destination: {0}".format(p[IPv6].dst))
77	exit(1)
78
79if 'ICMPv6ND_RS' not in p[IPv6]:
80	print("no router solicitation found")
81	exit(1)
82
83
84if 'ICMPv6NDOptSrcLLAddr' not in p[IPv6][ICMPv6ND_RS]:
85	print("no Source Link-Layer Address option")
86	exit(1)
87
88if p[Ether].src != p[IPv6][ICMPv6ND_RS].lladdr:
89	print("src mac ({0}) != lladdr option ({0})".format(p[Ether].src,
90	    p[IPv6][ICMPv6ND_RS].lladdr))
91	exit(1)
92
93print("received router solicitation")
94exit(0)
95