1#!/usr/local/bin/python3 2 3import os 4import threading 5import string 6import random 7from addr import * 8from scapy.all import * 9 10class Sniff(threading.Thread): 11 filter = None 12 captured = None 13 packet = None 14 def __init__(self): 15 # clear packets buffered by scapy bpf 16 sniff(iface=LOCAL_IF, timeout=1) 17 super(Sniff, self).__init__() 18 def run(self): 19 self.captured = sniff(iface=LOCAL_IF, filter=self.filter, 20 timeout=3) 21 if self.captured: 22 self.packet = self.captured[0] 23 24e=Ether(src=LOCAL_MAC, dst=REMOTE_MAC) 25ip6=IPv6(src=FAKE_NET_ADDR6, dst=REMOTE_ADDR6) 26uport=os.getpid() & 0xffff 27# inetd ignores UDP packets from privileged port or nfs 28if uport < 1024 or uport == 2049: 29 uport+=1024 30 31print("Send UDP packet with 1200 octets payload, receive echo.") 32data=''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + 33 string.digits) for _ in range(1200)) 34udp=UDP(sport=uport, dport='echo')/data 35echo=srp1(e/ip6/udp, iface=LOCAL_IF, timeout=5) 36 37if echo is None: 38 print("ERROR: No UDP answer from echo server received.") 39 exit(1) 40 41print("Send ICMP6 packet too big packet with MTU 1272.") 42icmp6=ICMPv6PacketTooBig(mtu=1272)/echo.payload 43sendp(e/IPv6(src=LOCAL_ADDR6, dst=REMOTE_ADDR6)/icmp6, iface=LOCAL_IF) 44 45print("Clear route cache at echo socket by sending from different address.") 46sendp(e/IPv6(src=LOCAL_ADDR6, dst=REMOTE_ADDR6)/udp, iface=LOCAL_IF) 47 48# srp1 cannot be used, fragment answer will not match on outgoing UDP packet 49sniffer = Sniff() 50sniffer.filter = \ 51 "ip6 and src "+ip6.dst+" and dst "+ip6.src+" and proto ipv6-frag" 52sniffer.start() 53time.sleep(1) 54 55print("Send UDP packet with 1200 octets payload.") 56sendp(e/ip6/udp, iface=LOCAL_IF) 57 58print("Path MTU discovery will not send UDP atomic fragment.") 59sniffer.join(timeout=5) 60 61print("IPv6 atomic fragments must not be generated.") 62frag=None 63for a in sniffer.captured: 64 fh=a.payload.payload 65 if fh.offset != 0 or fh.nh != (ip6/udp).nh: 66 continue 67 uh=fh.payload 68 if uh.sport != udp.dport or uh.dport != udp.sport: 69 continue 70 frag=a 71 break 72 73if frag is not None: 74 print("ERROR: Matching IPv6 fragment UDP answer found.") 75 exit(1) 76 77print("Send echo again and expect reply without fragmentation.") 78reply=srp1(e/IPv6(src=LOCAL_ADDR6, dst=REMOTE_ADDR6)/udp, iface=LOCAL_IF) 79 80print("UDP echo has IPv6 and UDP header, so expected payload len is 1248.") 81elen = reply.plen + len(IPv6()) 82print("rlen=%d" % elen) 83if elen != 1248: 84 print("ERROR: UDP reply payload len is %d, expected 1248." % elen) 85 exit(1) 86 87exit(0) 88