diff --git a/scapy/sendrecv.py b/scapy/sendrecv.py index 18775d50291..4f06c19d443 100644 --- a/scapy/sendrecv.py +++ b/scapy/sendrecv.py @@ -92,7 +92,7 @@ class debug: Automatically enabled when a generator is passed as the packet :param _flood: :param threaded: if True, packets are sent in a thread and received in another. - defaults to False. + Defaults to True. :param session: a flow decoder used to handle stream of packets :param chainEX: if True, exceptions during send will be forwarded :param stop_filter: Python function applied to each packet to determine if @@ -128,7 +128,7 @@ def __init__(self, rcv_pks=None, # type: Optional[SuperSocket] prebuild=False, # type: bool _flood=None, # type: Optional[_FloodGenerator] - threaded=False, # type: bool + threaded=True, # type: bool session=None, # type: Optional[_GlobSessionType] chainEX=False, # type: bool stop_filter=None # type: Optional[Callable[[Packet], bool]] @@ -158,7 +158,7 @@ def __init__(self, self.noans = 0 self._flood = _flood self.threaded = threaded - self.breakout = False + self.breakout = Event() # Instantiate packet holders if prebuild and not self._flood: self.tobesent = list(pkt) # type: _PacketIterable @@ -174,6 +174,7 @@ def __init__(self, self.timeout = None while retry >= 0: + self.breakout.clear() self.hsent = {} # type: Dict[bytes, List[Packet]] if threaded or self._flood: @@ -190,7 +191,7 @@ def __init__(self, except KeyboardInterrupt as ex: interrupted = ex - self.breakout = True + self.breakout.set() # Ended. Let's close gracefully if self._flood: @@ -251,6 +252,12 @@ def results(self): # type: () -> Tuple[SndRcvList, PacketList] return self.ans_result, self.unans_result + def _stop_sniffer_if_done(self) -> None: + """Close the sniffer if all expected answers have been received""" + if self._send_done and self.noans >= self.notans and not self.multi: + if self.sniffer and self.sniffer.running: + self.sniffer.stop(join=False) + def _sndrcv_snd(self): # type: () -> None """Function used in the sending thread of sndrcv()""" @@ -258,7 +265,7 @@ def _sndrcv_snd(self): p = None try: if self.verbose: - print("Begin emission:") + os.write(1, b"Begin emission\n") for p in self.tobesent: # Populate the dictionary of _sndrcv_rcv # _sndrcv_rcv won't miss the answer of a packet that @@ -266,13 +273,12 @@ def _sndrcv_snd(self): self.hsent.setdefault(p.hashret(), []).append(p) # Send packet self.pks.send(p) - if self.inter: - time.sleep(self.inter) - if self.breakout: + time.sleep(self.inter) + if self.breakout.is_set(): break i += 1 if self.verbose: - print("Finished sending %i packets." % i) + os.write(1, b"\nFinished sending %i packets\n" % i) except SystemExit: pass except Exception: @@ -291,13 +297,10 @@ def _sndrcv_snd(self): elif not self._send_done: self.notans = i self._send_done = True - # In threaded mode, timeout. - if self.threaded and self.timeout is not None and not self.breakout: - t = time.monotonic() + self.timeout - while time.monotonic() < t: - if self.breakout: - break - time.sleep(0.1) + self._stop_sniffer_if_done() + # In threaded mode, timeout + if self.threaded and self.timeout is not None and not self.breakout.is_set(): + self.breakout.wait(timeout=self.timeout) if self.sniffer and self.sniffer.running: self.sniffer.stop() @@ -324,9 +327,7 @@ def _process_packet(self, r): self.noans += 1 sentpkt._answered = 1 break - if self._send_done and self.noans >= self.notans and not self.multi: - if self.sniffer and self.sniffer.running: - self.sniffer.stop(join=False) + self._stop_sniffer_if_done() if not ok: if self.verbose > 1: os.write(1, b".") @@ -342,7 +343,7 @@ def _sndrcv_rcv(self, callback): self.sniffer = AsyncSniffer() self.sniffer._run( prn=self._process_packet, - timeout=None if self.threaded else self.timeout, + timeout=None if self.threaded and not self._flood else self.timeout, store=False, opened_socket=self.rcv_pks, session=self.session, diff --git a/test/contrib/automotive/doip.uts b/test/contrib/automotive/doip.uts index 7f4467c6d76..9a7d61e3b7d 100644 --- a/test/contrib/automotive/doip.uts +++ b/test/contrib/automotive/doip.uts @@ -416,6 +416,7 @@ import tempfile = Test DoIPSocket server_up = threading.Event() +sniff_up = threading.Event() def server(): buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4' sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -426,6 +427,7 @@ def server(): sock.listen(1) server_up.set() connection, address = sock.accept() + sniff_up.wait(timeout=1) connection.send(buffer) connection.close() finally: @@ -437,7 +439,7 @@ server_thread.start() server_up.wait(timeout=1) sock = DoIPSocket(activate_routing=False) -pkts = sock.sniff(timeout=1, count=2) +pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set) server_thread.join(timeout=1) assert len(pkts) == 2 @@ -446,6 +448,7 @@ assert len(pkts) == 2 ~ linux server_up = threading.Event() +sniff_up = threading.Event() def server(): buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4' sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -456,6 +459,7 @@ def server(): sock.listen(1) server_up.set() connection, address = sock.accept() + sniff_up.wait(timeout=1) for i in range(len(buffer)): connection.send(buffer[i:i+1]) time.sleep(0.01) @@ -469,13 +473,14 @@ server_thread.start() server_up.wait(timeout=1) sock = DoIPSocket(activate_routing=False) -pkts = sock.sniff(timeout=1, count=2) +pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set) server_thread.join(timeout=1) assert len(pkts) == 2 = Test DoIPSocket 3 server_up = threading.Event() +sniff_up = threading.Event() def server(): buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4' sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -486,6 +491,7 @@ def server(): sock.listen(1) server_up.set() connection, address = sock.accept() + sniff_up.wait(timeout=1) while buffer: randlen = random.randint(0, len(buffer)) connection.send(buffer[:randlen]) @@ -501,7 +507,7 @@ server_thread.start() server_up.wait(timeout=1) sock = DoIPSocket(activate_routing=False) -pkts = sock.sniff(timeout=1, count=2) +pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set) server_thread.join(timeout=1) assert len(pkts) == 2 @@ -509,6 +515,7 @@ assert len(pkts) == 2 = Test DoIPSocket6 server_up = threading.Event() +sniff_up = threading.Event() def server(): buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4' sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) @@ -519,6 +526,7 @@ def server(): sock.listen(1) server_up.set() connection, address = sock.accept() + sniff_up.wait(timeout=1) connection.send(buffer) connection.close() finally: @@ -530,7 +538,7 @@ server_thread.start() server_up.wait(timeout=1) sock = DoIPSocket(ip="::1", activate_routing=False) -pkts = sock.sniff(timeout=1, count=2) +pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set) server_thread.join(timeout=1) assert len(pkts) == 2 @@ -604,6 +612,7 @@ def _load_certificate_chain(context) -> None: server_up = threading.Event() +sniff_up = threading.Event() def server(): context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) _load_certificate_chain(context) @@ -619,6 +628,7 @@ def server(): ssock.listen(1) server_up.set() connection, address = ssock.accept() + sniff_up.wait(timeout=1) connection.send(buffer) connection.close() finally: @@ -633,7 +643,7 @@ context.check_hostname = False context.verify_mode = ssl.CERT_NONE sock = DoIPSocket(activate_routing=False, force_tls=True, context=context) -pkts = sock.sniff(timeout=1, count=2) +pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set) server_thread.join(timeout=1) assert len(pkts) == 2 @@ -641,6 +651,7 @@ assert len(pkts) == 2 ~ broken_windows server_up = threading.Event() +sniff_up = threading.Event() def server(): context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) _load_certificate_chain(context) @@ -656,6 +667,7 @@ def server(): ssock.listen(1) server_up.set() connection, address = ssock.accept() + sniff_up.wait(timeout=1) connection.send(buffer) connection.close() finally: @@ -670,7 +682,7 @@ context.check_hostname = False context.verify_mode = ssl.CERT_NONE sock = DoIPSocket(ip="::1", activate_routing=False, force_tls=True, context=context) -pkts = sock.sniff(timeout=1, count=2) +pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set) server_thread.join(timeout=1) assert len(pkts) == 2 @@ -678,6 +690,7 @@ assert len(pkts) == 2 ~ broken_windows server_up = threading.Event() +sniff_up = threading.Event() def server(): context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) _load_certificate_chain(context) @@ -693,6 +706,7 @@ def server(): ssock.listen(1) server_up.set() connection, address = ssock.accept() + sniff_up.wait(timeout=1) connection.send(buffer) connection.close() finally: @@ -707,15 +721,16 @@ context.check_hostname = False context.verify_mode = ssl.CERT_NONE sock = UDS_DoIPSocket(ip="::1", activate_routing=False, force_tls=True, context=context) -pkts = sock.sniff(timeout=1, count=2) +pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set) server_thread.join(timeout=1) assert len(pkts) == 2 = Test UDS_DualDoIPSslSocket6 -~ broken_windows +~ broken_windows not_pypy server_tcp_up = threading.Event() server_tls_up = threading.Event() +sniff_up = threading.Event() def server_tls(): context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) _load_certificate_chain(context) @@ -732,6 +747,7 @@ def server_tls(): ssock.listen(1) server_tls_up.set() connection, address = ssock.accept() + sniff_up.wait(timeout=1) connection.send(buffer) connection.close() finally: @@ -748,7 +764,7 @@ def server_tcp(): server_tcp_up.set() connection, address = sock.accept() connection.send(buffer) - connection.shutdown() + connection.shutdown(socket.SHUT_RDWR) connection.close() finally: sock.close() @@ -767,7 +783,7 @@ context.verify_mode = ssl.CERT_NONE sock = UDS_DoIPSocket(ip="::1", context=context) -pkts = sock.sniff(timeout=1, count=2) +pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set) server_tcp_thread.join(timeout=1) server_tls_thread.join(timeout=1) assert len(pkts) == 2 diff --git a/test/contrib/automotive/scanner/enumerator.uts b/test/contrib/automotive/scanner/enumerator.uts index 70f725a51ce..b1ac0cc8716 100644 --- a/test/contrib/automotive/scanner/enumerator.uts +++ b/test/contrib/automotive/scanner/enumerator.uts @@ -219,7 +219,19 @@ class MockISOTPSocket(SuperSocket): return len(sx) @staticmethod def select(sockets, remain=None): + time.sleep(0) return sockets + def sr(self, *args, **kargs): + from scapy import sendrecv + return sendrecv.sndrcv(self, *args, threaded=False, **kargs) + def sr1(self, *args, **kargs): + from scapy import sendrecv + ans = sendrecv.sndrcv(self, *args, threaded=False, **kargs)[0] # type: SndRcvList + if len(ans) > 0: + pkt = ans[0][1] # type: Packet + return pkt + else: + return None sock = MockISOTPSocket() sock.rcvd_queue.put(b"\x41") diff --git a/test/regression.uts b/test/regression.uts index 1612bda7422..bddaf06315d 100644 --- a/test/regression.uts +++ b/test/regression.uts @@ -1832,7 +1832,7 @@ sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM) ssck = StreamSocket(sck) try: - r = ssck.sr1(ICMP(type='echo-request'), timeout=0.1, chainEX=True) + r = ssck.sr1(ICMP(type='echo-request'), timeout=0.1, chainEX=True, threaded=False) assert False except Exception: assert True @@ -2132,7 +2132,7 @@ retry_test(_test) ~ netaccess needs_root IP ICMP def _test(): packet = IP(dst="8.8.8.8")/ICMP() - r = srflood(packet, timeout=2) + r = srflood(packet, timeout=0.5) assert packet.sent_time is not None retry_test(_test) @@ -2142,7 +2142,7 @@ retry_test(_test) def _test(): packet1 = IP(dst="8.8.8.8")/ICMP() packet2 = IP(dst="8.8.4.4")/ICMP() - r = srflood([packet1, packet2], timeout=2) + r = srflood([packet1, packet2], timeout=0.5) assert packet1.sent_time is not None assert packet2.sent_time is not None diff --git a/test/sendsniff.uts b/test/sendsniff.uts index 1a2ed99c4a5..4a69695f666 100644 --- a/test/sendsniff.uts +++ b/test/sendsniff.uts @@ -395,3 +395,42 @@ finally: e = os.system("ip netns del blob1") conf.ifaces.reload() conf.route.resync() + + += sr() performance test +~ linux needs_root veth not_pypy + +import subprocess +import shlex + +try: + # Create a dedicated network name space to simulate remote host + subprocess.check_call(shlex.split("sudo ip netns add scapy")) + # Create a virtual Ethernet pair to connect default and new NS + subprocess.check_call(shlex.split("sudo ip link add type veth")) + # Move veth1 to the new NS + subprocess.check_call(shlex.split("sudo ip link set veth1 netns scapy")) + # Setup vNIC in the default NS + subprocess.check_call(shlex.split("sudo ip link set veth0 up")) + subprocess.check_call(shlex.split("sudo ip addr add 192.168.168.1/24 dev veth0")) + # Setup vNIC in the dedicated NS + subprocess.check_call(shlex.split("sudo ip netns exec scapy ip link set lo up")) + subprocess.check_call(shlex.split("sudo ip netns exec scapy ip link set veth1 up")) + subprocess.check_call(shlex.split("sudo ip netns exec scapy ip addr add 192.168.168.2/24 dev veth1")) + # Perform test + conf.route.resync() + res, unansw = sr(IP(dst='192.168.168.2') / ICMP(seq=(1, 1000)), timeout=1, verbose=False) +finally: + try: + # Bring down the interfaces + subprocess.check_call(shlex.split("sudo ip netns exec scapy ip link set veth1 down")) + subprocess.check_call(shlex.split("sudo ip netns exec scapy ip link set lo down")) + # Delete the namespace + subprocess.check_call(shlex.split("sudo ip netns delete scapy")) + # Remove the virtual Ethernet pair + subprocess.check_call(shlex.split("sudo ip link delete veth0")) + except subprocess.CalledProcessError as e: + print(f"Error during cleanup: {e}") + +len(res) == 1000 + diff --git a/test/testsocket.py b/test/testsocket.py index 9709a99a350..d1a90a4da51 100644 --- a/test/testsocket.py +++ b/test/testsocket.py @@ -9,7 +9,6 @@ import time import random -from socket import socket from threading import Lock from scapy.config import conf @@ -25,10 +24,14 @@ Tuple, Any, List, - cast, ) from scapy.supersocket import SuperSocket +from scapy.plist import ( + PacketList, + SndRcvList, +) + open_test_sockets = list() # type: List[TestSocket] @@ -59,6 +62,25 @@ def __exit__(self, exc_type, exc_value, traceback): """Close the socket""" self.close() + def sr(self, *args, **kargs): + # type: (Any, Any) -> Tuple[SndRcvList, PacketList] + """Send and Receive multiple packets + """ + from scapy import sendrecv + return sendrecv.sndrcv(self, *args, threaded=False, **kargs) + + def sr1(self, *args, **kargs): + # type: (Any, Any) -> Optional[Packet] + """Send one packet and receive one answer + """ + from scapy import sendrecv + ans = sendrecv.sndrcv(self, *args, threaded=False, **kargs)[0] # type: SndRcvList + if len(ans) > 0: + pkt = ans[0][1] # type: Packet + return pkt + else: + return None + def close(self): # type: () -> None global open_test_sockets