diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..77d35e9 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +.tox +.coverage +*.pyc +*.swp +htmlcov/ +reports/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..d484a0b --- /dev/null +++ b/README.md @@ -0,0 +1,32 @@ +# eth-scapy-someip +Automotive Ethernet SOME/IP-SD Scapy protocol + +## 1. Description +**eth-scapy-someip** is a Scapy extension implementing Autosar's SOME/IP-SD protocol, giving any developer with Python knowledge an essential and powerful tool to develop Automotive Ethernet applications for the automotive world. + +Test automation, traffic generation, ECU development support or just **_for fun_** fiddling is all possible with *eth-scapy-someip*. + +## 2. Configuration + +### 2.1 VLAN +In order to configure VLAN (IEEE 802.1q) tagging in your linux machine, Ubuntu's wiki is a good reference : https://wiki.ubuntu.com/vlan. + +### 2.1 Interface configuration (Linux) +Feel free to choose your preferred network topology in order to start fiddling with SOME/IP-SD. In our case, we opted for a couple of USB-Ethernet adaptors but that's not strictly necessary. + +Just keep in mind that these conventions are used from the _example collection_: +- ETH_IFACE_A (normally acting as _sender_) + - iface name : eth1.10 + - iface addr : 192.168.10.2 + - iface port : 30490 +- ETH_IFACE_B (normally acting as _receiver_) + - iface name : eth2.10 + - iface addr : 192.168.10.3 + - iface port : 30490 + +## 3. Examples +This folder contains a (hopefully growing) examples collection, build upon unittest package just for convenience. Just fire Wireshark up and enjoy analyzing generated traffic. + +## 4. References +- https://www.autosar.org +- http://www.secdev.org/projects/scapy/ diff --git a/dist/eth_scapy_someip-0.0.1-py2-none-any.whl b/dist/eth_scapy_someip-0.0.1-py2-none-any.whl new file mode 100644 index 0000000..e2ae2b6 Binary files /dev/null and b/dist/eth_scapy_someip-0.0.1-py2-none-any.whl differ diff --git a/docs/AUTOSAR_TR_SomeIpExample.pdf b/docs/AUTOSAR_TR_SomeIpExample.pdf new file mode 100644 index 0000000..27079d2 Binary files /dev/null and b/docs/AUTOSAR_TR_SomeIpExample.pdf differ diff --git a/docs/Scapy.pdf b/docs/Scapy.pdf new file mode 100644 index 0000000..bcd045c Binary files /dev/null and b/docs/Scapy.pdf differ diff --git a/eth_scapy_someip/__init__.py b/eth_scapy_someip/__init__.py new file mode 100644 index 0000000..1d81c75 --- /dev/null +++ b/eth_scapy_someip/__init__.py @@ -0,0 +1,7 @@ +from .eth_scapy_someip import SOMEIP +from .eth_scapy_sd import SD + +from scapy.packet import bind_layers + +# Layer binding +bind_layers(SOMEIP,SD) diff --git a/eth_scapy_someip/eth_scapy_sd.py b/eth_scapy_someip/eth_scapy_sd.py new file mode 100644 index 0000000..e28c364 --- /dev/null +++ b/eth_scapy_someip/eth_scapy_sd.py @@ -0,0 +1,346 @@ +from scapy.fields import * +from scapy.packet import * +from scapy.all import * +from scapy.layers.inet6 import IP6Field +import ctypes +import collections +from .eth_scapy_someip import SOMEIP + + +class _SDPacketBase(Packet): + """ base class to be used among all SD Packet definitions.""" + # use this dictionary to set default values for desired fields (mostly on subclasses + # where not all fields are defined locally) + # - key : field_name, value : desired value + # - it will be used from 'init_fields' function, upon packet initialization + # + # example : _defaults = {'field_1_name':field_1_value,'field_2_name':field_2_value} + _defaults = {} + + def _set_defaults(self): + """ goes through '_defaults' dict setting field default values (for those that have been defined).""" + for key in self._defaults.keys(): + try: + self.get_field(key) + except KeyError: + pass + else: + self.setfieldval(key, self._defaults[key]) + + def init_fields(self): + """ perform initialization of packet fields with desired values. + NOTE : this funtion will only be called *once* upon class (or subclass) construction + """ + Packet.init_fields(self) + self._set_defaults() + + +# SD ENTRY +# - Service +# - EventGroup +class _SDEntry(_SDPacketBase): + """ Base class for SDEntry_* packages.""" + TYPE_FMT = ">B" + TYPE_PAYLOAD_I = 0 + # ENTRY TYPES : SERVICE + TYPE_SRV_FINDSERVICE = 0x00 + TYPE_SRV_OFFERSERVICE = 0x01 + TYPE_SRV = (TYPE_SRV_FINDSERVICE, TYPE_SRV_OFFERSERVICE) + # ENTRY TYPES : EVENGROUP + TYPE_EVTGRP_SUBSCRIBE = 0x06 + TYPE_EVTGRP_SUBSCRIBE_ACK = 0x07 + TYPE_EVTGRP = (TYPE_EVTGRP_SUBSCRIBE, TYPE_EVTGRP_SUBSCRIBE_ACK) + # overall len (UT usage) + OVERALL_LEN = 16 + + fields_desc = [ + ByteField("type", 0), + ByteField("index_1", 0), + ByteField("index_2", 0), + BitField("n_opt_1", 0, 4), + BitField("n_opt_2", 0, 4), + ShortField("srv_id", 0), + ShortField("inst_id", 0), + ByteField("major_ver", 0), + X3BytesField("ttl", 0)] + + def guess_payload_class(self, payload): + """ decode SDEntry depending on its type.""" + pl_type = struct.unpack(_SDEntry.TYPE_FMT, payload[_SDEntry.TYPE_PAYLOAD_I:_SDEntry.TYPE_PAYLOAD_I+1])[0] + if pl_type in _SDEntry.TYPE_SRV: + return SDEntry_Service + elif pl_type in _SDEntry.TYPE_EVTGRP: + return SDEntry_EventGroup + + +class SDEntry_Service(_SDEntry): + """ Service Entry.""" + _defaults = {"type": _SDEntry.TYPE_SRV_FINDSERVICE} + + name = "Service Entry" + fields_desc = [ + _SDEntry, + IntField("minor_ver", 0)] + + +class SDEntry_EventGroup(_SDEntry): + """ EventGroup Entry.""" + _defaults = {"type": _SDEntry.TYPE_EVTGRP_SUBSCRIBE} + + name = "Eventgroup Entry" + fields_desc = [ + _SDEntry, + BitField("res", 0, 12), + BitField("cnt", 0, 4), + ShortField("eventgroup_id", 0)] + + +# SD Option +# - Configuration +# - LoadBalancing +# - IPv4 EndPoint +# - IPv6 EndPoint +# - IPv4 MultiCast +# - IPv6 MultiCast +# - IPv4 EndPoint +# - IPv6 EndPoint +class _SDOption(_SDPacketBase): + """ Base class for SDOption_* packages.""" + TYPE_FMT = ">B" + TYPE_PAYLOAD_I = 2 + + CFG_TYPE = 0x01 + CFG_OVERALL_LEN = 4 # overall length of CFG SDOption,empty 'cfg_str' (to be used from UT) + LOADBALANCE_TYPE = 0x02 + LOADBALANCE_LEN = 0x05 + LOADBALANCE_OVERALL_LEN = 8 # overall length of LB SDOption (to be used from UT) + IP4_ENDPOINT_TYPE = 0x04 + IP4_ENDPOINT_LEN = 0x0009 + IP4_MCAST_TYPE = 0x14 + IP4_MCAST_LEN = 0x0009 + IP4_SDENDPOINT_TYPE = 0x24 + IP4_SDENDPOINT_LEN = 0x0009 + IP4_OVERALL_LEN = 12 # overall length of IP4 SDOption (to be used from UT) + IP6_ENDPOINT_TYPE = 0x06 + IP6_ENDPOINT_LEN = 0x0015 + IP6_MCAST_TYPE = 0x16 + IP6_MCAST_LEN = 0x0015 + IP6_SDENDPOINT_TYPE = 0x26 + IP6_SDENDPOINT_LEN = 0x0015 + IP6_OVERALL_LEN = 24 # overall length of IP6 SDOption (to be used from UT) + + def guess_payload_class(self, payload): + """ decode SDOption depending on its type.""" + pl_type = struct.unpack(_SDOption.TYPE_FMT, payload[_SDOption.TYPE_PAYLOAD_I:_SDOption.TYPE_PAYLOAD_I+1])[0] + + if pl_type == _SDOption.CFG_TYPE: + return SDOption_Config + elif pl_type == self.LOADBALANCE_TYPE: + return SDOption_LoadBalance + elif pl_type == self.IP4_ENDPOINT_TYPE: + return SDOption_IP4_EndPoint + elif pl_type == self.IP4_MCAST_TYPE: + return SDOption_IP4_Multicast + elif pl_type == self.IP4_SDENDPOINT_TYPE: + return SDOption_IP4_SD_EndPoint + elif pl_type == self.IP6_ENDPOINT_TYPE: + return SDOption_IP6_EndPoint + elif pl_type == self.IP6_MCAST_TYPE: + return SDOption_IP6_Multicast + elif pl_type == self.IP6_SDENDPOINT_TYPE: + return SDOption_IP6_SD_EndPoint + + +class _SDOption_Header(_SDOption): + fields_desc = [ + ShortField("len", None), + ByteField("type", 0), + ByteField("res_hdr", 0)] + + +class _SDOption_Tail(_SDOption): + fields_desc = [ + ByteField("res_tail", 0), + ByteEnumField("l4_proto", 0x06, {0x06: "TCP", 0x11: "UDP"}), + ShortField("port", 0)] + + +class _SDOption_IP4(_SDOption): + fields_desc = [ + _SDOption_Header, + IPField("addr", "0.0.0.0"), + _SDOption_Tail] + + +class _SDOption_IP6(_SDOption): + fields_desc = [ + _SDOption_Header, + IP6Field("addr", "2001:cdba:0000:0000:0000:0000:3257:9652"), + _SDOption_Tail] + + +class SDOption_Config(_SDOption): + # offset to be added upon length calculation (corresponding to header's "Reserved" field) + LEN_OFFSET = 0x01 + + name = "Config Option" + # default values specification + _defaults = {'type': _SDOption.CFG_TYPE} + # package fields definiton + fields_desc = [ + _SDOption_Header, + StrField("cfg_str", "")] + + def post_build(self, p, pay): + # length computation excluding 16b_length and 8b_type + l = self.len + if l is None: + l = len(self.cfg_str) + self.LEN_OFFSET + p = struct.pack("!H", l) + p[2:] + return p + pay + + +class SDOption_LoadBalance(_SDOption): + name = "LoadBalance Option" + # default values specification + _defaults = {'type': _SDOption.LOADBALANCE_TYPE, + 'len': _SDOption.LOADBALANCE_LEN} + # package fields definiton + fields_desc = [ + _SDOption_Header, + ShortField("priority", 0), + ShortField("weight", 0)] + + +# SDOPTIONS : IPv4-specific +class SDOption_IP4_EndPoint(_SDOption_IP4): + name = "IP4 EndPoint Option" + # default values specification + _defaults = {'type': _SDOption.IP4_ENDPOINT_TYPE, 'len': _SDOption.IP4_ENDPOINT_LEN} + + +class SDOption_IP4_Multicast(_SDOption_IP4): + name = "IP4 Multicast Option" + # default values specification + _defaults = {'type': _SDOption.IP4_MCAST_TYPE, 'len': _SDOption.IP4_MCAST_LEN} + + +class SDOption_IP4_SD_EndPoint(_SDOption_IP4): + name = "IP4 SDEndPoint Option" + # default values specification + _defaults = {'type': _SDOption.IP4_SDENDPOINT_TYPE, 'len': _SDOption.IP4_SDENDPOINT_LEN} + + +# SDOPTIONS : IPv6-specific +class SDOption_IP6_EndPoint(_SDOption_IP6): + name = "IP6 EndPoint Option" + # default values specification + _defaults = {'type': _SDOption.IP6_ENDPOINT_TYPE, 'len': _SDOption.IP6_ENDPOINT_LEN} + + +class SDOption_IP6_Multicast(_SDOption_IP6): + name = "IP6 Multicast Option" + # default values specification + _defaults = {'type': _SDOption.IP6_MCAST_TYPE, 'len': _SDOption.IP6_MCAST_LEN} + + +class SDOption_IP6_SD_EndPoint(_SDOption_IP6): + name = "IP6 SDEndPoint Option" + # default values specification + _defaults = {'type': _SDOption.IP6_SDENDPOINT_TYPE, 'len': _SDOption.IP6_SDENDPOINT_LEN} + + +# +# SD PACKAGE DEFINITION +# +class SD(_SDPacketBase): + """ + SD Packet + + NOTE : when adding 'entries' or 'options', do not use list.append() method but create a new list + e.g. : p = SD() + p.option_array = [SDOption_Config(),SDOption_IP6_EndPoint()] + """ + SOMEIP_MSGID_SRVID = 0xffff + SOMEIP_MSGID_SUBID = 0x1 + SOMEIP_MSGID_EVENTID = 0x100 + SOMEIP_PROTO_VER = 0x01 + SOMEIP_IFACE_VER = 0x01 + SOMEIP_MSG_TYPE = SOMEIP.TYPE_NOTIFICATION + + name = "SD" + # Flags definition: {"name":(mask,offset)} + _sdFlag = collections.namedtuple('Flag', 'mask offset') + FLAGSDEF = { + "REBOOT": _sdFlag(mask=0x80, offset=7), # ReBoot flag + "UNICAST": _sdFlag(mask=0x40, offset=6) # UniCast flag + } + + fields_desc = [ + ByteField("flags", 0), + X3BytesField("res", 0), + FieldLenField("len_entry_array", None, length_of="entry_array", fmt="!I"), + PacketListField("entry_array", None, cls=_SDEntry, length_from=lambda pkt:pkt.len_entry_array), + FieldLenField("len_option_array", None, length_of="option_array", fmt="!I"), + PacketListField("option_array", None, cls=_SDOption, length_from=lambda pkt:pkt.len_option_array)] + + def __init__(self, *args, **kwargs): + super(SD, self).__init__(*args, **kwargs) + self.explicit = 1 + + def getFlag(self, name): + """ get particular flag from bitfield.""" + name = name.upper() + if name in self.FLAGSDEF: + return (self.flags & self.FLAGSDEF[name].mask) >> self.FLAGSDEF[name].offset + else: + return None + + def setFlag(self, name, value): + """ + Set particular flag on bitfield. + :param str name : name of the flag to set (see SD.FLAGSDEF) + :param int value : either 0x1 or 0x0 (provided int will be ANDed with 0x01) + """ + name = name.upper() + if name in self.FLAGSDEF: + self.flags = ((self.flags & ctypes.c_ubyte(~self.FLAGSDEF[name].mask).value) | + (value & 0x01) << self.FLAGSDEF[name].offset) + + def setEntryArray(self, entry_list): + """ + Add entries to entry_array. + :param entry_list: list of entries to be added. Single entry object also accepted + """ + if isinstance(entry_list, list): + self.entry_array = entry_list + else: + self.entry_array = [entry_list] + + def setOptionArray(self, option_list): + """ + Add options to option_array. + :param option_list: list of options to be added. Single option object also accepted + """ + if isinstance(option_list, list): + self.option_array = option_list + else: + self.option_array = [option_list] + + def getSomeip(self, stacked=False): + """ + return SD-initialized SOME/IP packet + :param stacked: boolean. Either just SOME/IP packet or stacked over SD-self + """ + p = SOMEIP() + p.msg_id.srv_id = SD.SOMEIP_MSGID_SRVID + p.msg_id.sub_id = SD.SOMEIP_MSGID_SUBID + p.msg_id.event_id = SD.SOMEIP_MSGID_EVENTID + p.proto_ver = SD.SOMEIP_PROTO_VER + p.iface_ver = SD.SOMEIP_IFACE_VER + p.msg_type = SD.SOMEIP_MSG_TYPE + + if stacked: + return p / self + else: + return p diff --git a/eth_scapy_someip/eth_scapy_someip.py b/eth_scapy_someip/eth_scapy_someip.py new file mode 100644 index 0000000..9798229 --- /dev/null +++ b/eth_scapy_someip/eth_scapy_someip.py @@ -0,0 +1,141 @@ +from scapy.all import * +from scapy.fields import * +from scapy.packet import * + +"""SOMEIP PACKAGE DEFINITION""" + + +class _SOMEIP_MessageId(Packet): + """MessageId subpacket.""" + name = 'MessageId' + fields_desc = [ + ShortField('srv_id', 0), + BitEnumField('sub_id', 0, 1, {0: 'METHOD_ID', 1: 'EVENT_ID'}), + ConditionalField(BitField('method_id', 0, 15), lambda pkt: pkt.sub_id == 0), + ConditionalField(BitField('event_id', 0, 15), lambda pkt: pkt.sub_id == 1) + ] + + def extract_padding(self, p): + return '', p + + +class _SOMEIP_RequestId(Packet): + """ RequestId subpacket.""" + name = 'RequestId' + fields_desc = [ + ShortField('client_id', 0), + ShortField('session_id', 0)] + + def extract_padding(self, p): + return '', p + + +class SOMEIP(Packet): + """ SOME/IP Packet.""" + # Default values + PROTOCOL_VERSION = 0x01 + INTERFACE_VERSION = 0x01 + + # Lenght offset (without payload) + LEN_OFFSET = 0x08 + + # SOME/IP TYPE VALUES + TYPE_REQUEST = 0x00 + TYPE_REQUEST_NO_RET = 0x01 + TYPE_NOTIFICATION = 0x02 + TYPE_REQUEST_ACK = 0x40 + TYPE_REQUEST_NORET_ACK = 0x41 + TYPE_NOTIFICATION_ACK = 0x42 + TYPE_RESPONSE = 0x80 + TYPE_ERROR = 0x81 + TYPE_RESPONSE_ACK = 0xc0 + TYPE_ERROR_ACK = 0xc1 + + # SOME/IP-TP TYPE VALUES + TYPE_REQUEST_SEGMENT = 0x20 + TYPE_REQUEST_NO_RET_SEGMENT = 0x21 + TYPE_NOTIFICATION_SEGMENT = 0x22 + TYPE_REQUEST_ACK_SEGMENT = 0x60 + TYPE_REQUEST_NORET_ACK_SEGMENT = 0x61 + TYPE_NOTIFICATION_ACK_SEGMENT = 0x62 + TYPE_RESPONSE_SEGMENT = 0xa0 + TYPE_ERROR_SEGMENT = 0xa1 + TYPE_RESPONSE_ACK_SEGMENT = 0xe0 + TYPE_ERROR_ACK_SEGMENT = 0xe1 + SOMEIP_TP_TYPES = frozenset({TYPE_REQUEST_SEGMENT, TYPE_REQUEST_NO_RET_SEGMENT, TYPE_NOTIFICATION_SEGMENT, + TYPE_REQUEST_ACK_SEGMENT, TYPE_REQUEST_NORET_ACK_SEGMENT, + TYPE_NOTIFICATION_ACK_SEGMENT, TYPE_RESPONSE_SEGMENT, TYPE_ERROR_SEGMENT, + TYPE_RESPONSE_ACK_SEGMENT, TYPE_ERROR_ACK_SEGMENT}) + SOMEIP_TP_TYPE_BIT_MASK = 0x20 + + # SOME/IP RETURN CODES + RET_E_OK = 0x00 + RET_E_NOT_OK = 0x01 + RET_E_UNKNOWN_SERVICE = 0x02 + RET_E_UNKNOWN_METHOD = 0x03 + RET_E_NOT_READY = 0x04 + RET_E_NOT_REACHABLE = 0x05 + RET_E_TIMEOUT = 0x06 + RET_E_WRONG_PROTOCOL_V = 0x07 + RET_E_WRONG_INTERFACE_V = 0x08 + RET_E_MALFORMED_MSG = 0x09 + RET_E_WRONG_MESSAGE_TYPE = 0x0a + + # SOME/IP-TP More Segments Flag + SOMEIP_TP_LAST_SEGMENT = 0 + SOMEIP_TP_MORE_SEGMENTS = 1 + + _OVERALL_LEN_NOPAYLOAD = 16 # UT + + name = 'SOME/IP' + + fields_desc = [ + PacketField('msg_id', _SOMEIP_MessageId(), _SOMEIP_MessageId), # MessageID + IntField('len', None), # Length + PacketField('req_id', _SOMEIP_RequestId(), _SOMEIP_RequestId), # RequestID + ByteField('proto_ver', PROTOCOL_VERSION), # Protocol version + ByteField('iface_ver', INTERFACE_VERSION), # Interface version + ByteEnumField('msg_type', TYPE_REQUEST, { # -- Message type -- + TYPE_REQUEST: 'REQUEST', # 0x00 + TYPE_REQUEST_NO_RET: 'REQUEST_NO_RETURN', # 0x01 + TYPE_NOTIFICATION: 'NOTIFICATION', # 0x02 + TYPE_REQUEST_ACK: 'REQUEST_ACK', # 0x40 + TYPE_REQUEST_NORET_ACK: 'REQUEST_NO_RETURN_ACK', # 0x41 + TYPE_NOTIFICATION_ACK: 'NOTIFICATION_ACK', # 0x42 + TYPE_RESPONSE: 'RESPONSE', # 0x80 + TYPE_ERROR: 'ERROR', # 0x81 + TYPE_RESPONSE_ACK: 'RESPONSE_ACK', # 0xc0 + TYPE_ERROR_ACK: 'ERROR_ACK', # 0xc1 + }), + ByteEnumField('retcode', 0, { # -- Return code -- + RET_E_OK: 'E_OK', # 0x00 + RET_E_NOT_OK: 'E_NOT_OK', # 0x01 + RET_E_UNKNOWN_SERVICE: 'E_UNKNOWN_SERVICE', # 0x02 + RET_E_UNKNOWN_METHOD: 'E_UNKNOWN_METHOD', # 0x03 + RET_E_NOT_READY: 'E_NOT_READY', # 0x04 + RET_E_NOT_REACHABLE: 'E_NOT_REACHABLE', # 0x05 + RET_E_TIMEOUT: 'E_TIMEOUT', # 0x06 + RET_E_WRONG_PROTOCOL_V: 'E_WRONG_PROTOCOL_VERSION', # 0x07 + RET_E_WRONG_INTERFACE_V: 'E_WRONG_INTERFACE_VERSION', # 0x08 + RET_E_MALFORMED_MSG: 'E_MALFORMED_MESSAGE', # 0x09 + RET_E_WRONG_MESSAGE_TYPE: 'E_WRONG_MESSAGE_TYPE', # 0x0a + }), + ConditionalField(BitField('offset', 0, 28), lambda pkt: pkt.msg_type in SOMEIP.SOMEIP_TP_TYPES), + ConditionalField(BitField('reserved', 0, 3), lambda pkt: pkt.msg_type in SOMEIP.SOMEIP_TP_TYPES), + ConditionalField(BitEnumField('more_segments', 0, 1, {SOMEIP_TP_LAST_SEGMENT: 'Last_Segment', + SOMEIP_TP_MORE_SEGMENTS: 'More_Segments' + }), lambda pkt: pkt.msg_type in SOMEIP.SOMEIP_TP_TYPES) + ] + + def post_build(self, p, pay): + length = self.len + # length computation : RequestID + PROTOVER_IFACEVER_TYPE_RETCODE + PAYLOAD + if length is None: + length = self.LEN_OFFSET + len(pay) + p = p[:4] + struct.pack('!I', length) + p[8:] + return p + pay + + +for i in range(15): + bind_layers(UDP, SOMEIP, sport=30490 + i) + bind_layers(TCP, SOMEIP, sport=30490 + i) diff --git a/examples/ex_00_basics.py b/examples/ex_00_basics.py new file mode 100644 index 0000000..e67f1d9 --- /dev/null +++ b/examples/ex_00_basics.py @@ -0,0 +1,20 @@ +import unittest +from scapy.all import * + +class ex_00_basics(unittest.TestCase): + def setUp(self): + pass + def tearDown(self): + pass + + def test_00(self): + """ ping google.""" + p = IP(dst="www.google.com")/ICMP() + ans,unans = sr(p,timeout=1) + self.assertTrue(len(unans) == 0) + + def _test_01(self): + """ DNS query.""" + p = IP(dst="8.8.8.8")/UDP()/DNS(rd=1,qd=DNSQR(qname="www.slashdot.org")) + p_rcv = sr1(p) + p_rcv.show() diff --git a/examples/ex_01_someip.py b/examples/ex_01_someip.py new file mode 100644 index 0000000..ff4fe65 --- /dev/null +++ b/examples/ex_01_someip.py @@ -0,0 +1,52 @@ +import unittest +from collections import namedtuple +from scapy.all import * + +from eth_scapy_someip import eth_scapy_someip as someip +from eth_scapy_someip import eth_scapy_sd as sd + +iface = namedtuple('iface','name ip port') +ETH_IFACE_A = iface(name='eth1.10', ip='192.168.10.2', port=30490) +ETH_IFACE_B = iface(name='eth2.10', ip='192.168.10.3', port=30490) + +class ex_01_someip(unittest.TestCase): + def setUp(self): + pass + def tearDown(self): + pass + + def test_00(self): + """ SOME/IP magic cookie (client >> server). TR_SOMEIP_00159.""" + # build SOME/IP packet + sip = someip.SOMEIP() + sip.msg_id.srv_id = 0xffff + sip.msg_id.sub_id = 0x0 + sip.msg_id.method_id = 0x0000 + + sip.req_id.client_id = 0xdead + sip.req_id.session_id = 0xbeef + + sip.msg_type = 0x01 + sip.retcode = 0x00 + + # send message + p = Ether()/IP(src=ETH_IFACE_B.ip,dst=ETH_IFACE_A.ip)/UDP(sport=30490,dport=30490)/sip + + sendp(p,iface=ETH_IFACE_B.name) + + def test_01(self): + """ SOME/IP-SD : Example for a serialization protocol, 6.7.3.7 Example of SOME/IP-SD PDU.""" + # build SOME/IP-SD packet + sdp = sd.SD() + + sdp.flags = 0x80 + sdp.entry_array = [ + sd.SDEntry_Service(type=sd.SDEntry_Service.TYPE_SRV_FINDSERVICE,srv_id=0x4711,inst_id=0xffff,major_ver=0xff,ttl=3600,minor_ver=0xffffffff), + sd.SDEntry_Service(type=sd.SDEntry_Service.TYPE_SRV_OFFERSERVICE,n_opt_1=1,srv_id=0x1234,inst_id=0x0001,major_ver=0x01,ttl=3,minor_ver=0x00000032)] + sdp.option_array = [ + sd.SDOption_IP4_EndPoint(addr="192.168.0.1",l4_proto=0x11,port=0xd903)] + + # SEND MESSAGE + sip = Ether()/IP(src=ETH_IFACE_B.ip,dst=ETH_IFACE_A.ip)/UDP(sport=ETH_IFACE_B.port,dport=ETH_IFACE_A.port)/sdp.getSomeip(stacked=True) + sendp(sip,iface=ETH_IFACE_B.name) + diff --git a/examples/ex_02_sd.py b/examples/ex_02_sd.py new file mode 100644 index 0000000..4c2c54d --- /dev/null +++ b/examples/ex_02_sd.py @@ -0,0 +1,51 @@ +import unittest +import threading +from collections import namedtuple +from scapy.all import * + +from eth_scapy_someip import eth_scapy_someip as someip +from eth_scapy_someip import eth_scapy_sd as sd + +iface = namedtuple('iface','name ip port') +ETH_IFACE_A = iface(name='eth1.10', ip='192.168.10.2', port=30490) +ETH_IFACE_B = iface(name='eth2.10', ip='192.168.10.3', port=30490) + +class ex_02_sd(unittest.TestCase): + def setUp(self): + pass + def tearDown(self): + pass + + def _test_00_sender(self,p): + """ sender thread : immediately send provided packet.""" + _ = srp1(p,iface = ETH_IFACE_A.name,timeout=5) + def _test_00_rcver(self,s,p): + """ receiver thread : wait s seconds and send reply.""" + time.sleep(s) + sendp(p,iface = ETH_IFACE_A.name) + def test_00(self): + """ + SOME/IP-SD Subscribe eventgroup + """ + # build SOME/IP-SD packet + sdp = sd.SD() + + sdp.flags = 0x00 + sdp.entry_array = [ + sd.SDEntry_EventGroup(srv_id=0x1111,n_opt_1=1,inst_id=0x2222,major_ver=0x03,eventgroup_id=0x04,cnt=0x0,ttl=0x05)] + sdp.option_array = [ + sd.SDOption_IP4_EndPoint(addr="192.168.0.1",l4_proto=0x11,port=0xd903)] + + # build request and reply packages + p = Ether()/IP(src=ETH_IFACE_A.ip,dst=ETH_IFACE_B.ip)/UDP(sport=ETH_IFACE_A.port,dport=ETH_IFACE_B.port)/sdp.getSomeip(True) + r = Ether()/IP(src=ETH_IFACE_B.ip,dst=ETH_IFACE_A.ip)/UDP(sport=ETH_IFACE_B.port,dport=ETH_IFACE_A.port)/sdp.getSomeip(True) + r['SD'].entry_array[0].type = sd.SDEntry_EventGroup.TYPE_EVTGRP_SUBSCRIBE_ACK + + # 'dummy-ly' use a couple of threads to emulate traffic + t_send = threading.Thread(name='sender',target=self._test_00_sender,args=(p,)) + t_rcv = threading.Thread(name='receiver',target=self._test_00_rcver,args=(2,r,)) + t_send.start() + t_rcv.start() + t_send.join() + t_rcv.join() + diff --git a/examples/examples_main.py b/examples/examples_main.py new file mode 100644 index 0000000..891e163 --- /dev/null +++ b/examples/examples_main.py @@ -0,0 +1,20 @@ +import unittest +import HtmlTestRunner +from ex_00_basics import ex_00_basics +from ex_01_someip import ex_01_someip +from ex_02_sd import ex_02_sd + +def suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(ex_00_basics)) + suite.addTest(unittest.makeSuite(ex_01_someip)) + suite.addTest(unittest.makeSuite(ex_02_sd)) + return suite + +if __name__=='__main__': + # uncomment for HTML testrunner output + #runner = HtmlTestRunner.HTMLTestRunner(output='examples') + + runner = unittest.TextTestRunner() + test_suite = suite() + runner.run(test_suite) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..71cf9c4 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +pytest +pytest-cov +scapy>=2.4.1 +wheel==0.24.0 +junit-xml==1.8 +coverage +html-testRunner==1.1.2 diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..d48f379 --- /dev/null +++ b/setup.py @@ -0,0 +1,11 @@ +from setuptools import setup + +setup( + name="eth_scapy_someip", + version="0.3.0", + author="Jose Amores", + description="Automotive Ethernet SOME/IP-SD Scapy protocol", + license="MIT", + py_modules=[], + packages=['eth_scapy_someip'], +) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_sd.py b/tests/test_sd.py new file mode 100644 index 0000000..e8b83e2 --- /dev/null +++ b/tests/test_sd.py @@ -0,0 +1,246 @@ +import binascii +import struct +import sys +import os +import pytest + +from eth_scapy_someip import eth_scapy_someip as someip +from eth_scapy_someip import eth_scapy_sd as sd + +HERE = os.path.dirname(os.path.realpath(__file__)) + +def test_00_SDEntry_Service(): + p = sd.SDEntry_Service() + + # packet length + assert(len(binascii.hexlify(bytes(p)))/2 == sd._SDEntry.OVERALL_LEN) + + # fields' setting + p.type = sd._SDEntry.TYPE_SRV_OFFERSERVICE + p.index_1 = 0x11 + p.index_2 = 0x22 + p.srv_id = 0x3333 + p.inst_id = 0x4444 + p.major_ver = 0x55 + p.ttl = 0x666666 + p.minor_ver = 0xdeadbeef + + p_str = binascii.hexlify(bytes(p)) + bin_str = b'011122003333444455666666deadbeef' + assert(p_str == bin_str) + + # fields' setting : N_OPT + # value above 4 bits, serialized packet should feature 0x1 and 0x2 + del(p) + p = sd.SDEntry_Service() + p.n_opt_1 = 0xf1 + p.n_opt_2 = 0xf2 + p_str = binascii.hexlify(bytes(p)) + bin_str = b'00'*3+b'12'+b'00'*12 + assert(p_str == bin_str) + assert(len(p_str)/2 == sd._SDEntry.OVERALL_LEN) + + # Payload guess + p_entry = sd._SDEntry() + p_entry_srv = sd.SDEntry_Service() + + assert(p_entry.guess_payload_class(bytes(p_entry_srv)) == sd.SDEntry_Service) + + +def test_01_SDEntry_EventGroup(): + p = sd.SDEntry_EventGroup() + + # packet length + assert(len(binascii.hexlify(bytes(p)))/2 == sd._SDEntry.OVERALL_LEN) + + # fields' setting + p.index_1 = 0x11 + p.index_2 = 0x22 + p.srv_id = 0x3333 + p.inst_id = 0x4444 + p.major_ver = 0x55 + p.ttl = 0x666666 + p.cnt = 0x7 + p.eventgroup_id = 0x8888 + + p_str = binascii.hexlify(bytes(p)) + bin_str = b'06112200333344445566666600078888' + assert(p_str == bin_str) + + # Payload guess + p_entry = sd._SDEntry() + p_entry_evtgrp = sd.SDEntry_EventGroup() + + assert(p_entry.guess_payload_class(bytes(p_entry_evtgrp)) == sd.SDEntry_EventGroup) + +def test_02_SDOption_Config(): + p = sd.SDOption_Config() + + # pkg type + assert(p.type == sd._SDOption.CFG_TYPE) + # length without payload + assert(len(binascii.hexlify(bytes(p)))/2 == sd._SDOption.CFG_OVERALL_LEN) + + # add payload and check length + p.cfg_str = "5abc=x7def=1230" + assert(binascii.hexlify(bytes(p)) == b'00100100'+binascii.hexlify(b'5abc=x7def=1230')) + + # Payload guess + p_option = sd._SDOption() + assert(p_option.guess_payload_class(bytes(p)) == sd.SDOption_Config) + +def test_03_SDOption_LoadBalance(): + p = sd.SDOption_LoadBalance() + + # pkg type & lengths (static and overall) + assert(p.type == sd._SDOption.LOADBALANCE_TYPE) + assert(p.len == sd._SDOption.LOADBALANCE_LEN) + assert(len(binascii.hexlify(bytes(p)))/2 == sd._SDOption.LOADBALANCE_OVERALL_LEN) + + # Payload guess + p_option = sd._SDOption() + assert(p_option.guess_payload_class(bytes(p)) == sd.SDOption_LoadBalance) + +def test_04_SDOption_IP4_EndPoint(): + p = sd.SDOption_IP4_EndPoint() + + # pkg type & length + assert(p.type == sd._SDOption.IP4_ENDPOINT_TYPE) + assert(p.len == sd._SDOption.IP4_ENDPOINT_LEN) + + # Payload guess + p_option = sd._SDOption() + assert(p_option.guess_payload_class(bytes(p)) == sd.SDOption_IP4_EndPoint) + +def test_05_SDOption_IP4_Multicast(): + p = sd.SDOption_IP4_Multicast() + + # pkg type & length + assert(p.type == sd._SDOption.IP4_MCAST_TYPE) + assert(p.len == sd._SDOption.IP4_MCAST_LEN) + + # Payload guess + p_option = sd._SDOption() + assert(p_option.guess_payload_class(bytes(p)) == sd.SDOption_IP4_Multicast) + +def test_06_SDOption_IP4_SD_EndPoint(): + p = sd.SDOption_IP4_SD_EndPoint() + + # pkg type & length + assert(p.type == sd._SDOption.IP4_SDENDPOINT_TYPE) + assert(p.len == sd._SDOption.IP4_SDENDPOINT_LEN) + + # Payload guess + p_option = sd._SDOption() + assert(p_option.guess_payload_class(bytes(p)) == sd.SDOption_IP4_SD_EndPoint) + +def test_07_SDOption_IP6_EndPoint(): + p = sd.SDOption_IP6_EndPoint() + + # pkg type & length + assert(p.type == sd._SDOption.IP6_ENDPOINT_TYPE) + assert(p.len == sd._SDOption.IP6_ENDPOINT_LEN) + + # Payload guess + p_option = sd._SDOption() + assert(p_option.guess_payload_class(bytes(p)) == sd.SDOption_IP6_EndPoint) + +def test_08_SDOption_IP6_Multicast(): + p = sd.SDOption_IP6_Multicast() + + # pkg type & length + assert(p.type == sd._SDOption.IP6_MCAST_TYPE) + assert(p.len == sd._SDOption.IP6_MCAST_LEN) + + # Payload guess + p_option = sd._SDOption() + assert(p_option.guess_payload_class(bytes(p)) == sd.SDOption_IP6_Multicast) + +def test_09_SDOption_IP6_SD_EndPoint(): + p = sd.SDOption_IP6_SD_EndPoint() + + # pkg type & length + assert(p.type == sd._SDOption.IP6_SDENDPOINT_TYPE) + assert(p.len == sd._SDOption.IP6_SDENDPOINT_LEN) + + # Payload guess + p_option = sd._SDOption() + assert(p_option.guess_payload_class(bytes(p)) == sd.SDOption_IP6_SD_EndPoint) + +def test_0a_SD_Flags(): + p = sd.SD() + + p.setFlag("REBOOT",1) + assert(p.flags == 0x80) + assert(p.getFlag("REBOOT") == 1) + p.setFlag("REBOOT",0) + assert(p.flags == 0x00) + assert(p.getFlag("REBOOT") == 0) + p.setFlag("UNICAST",1) + assert(p.flags == 0x40) + assert(p.getFlag("UNICAST") == 1) + p.setFlag("UNICAST",0) + assert(p.flags == 0x00) + assert(p.getFlag("UNICAST") == 0) + + p.setFlag("REBOOT",1) + p.setFlag("UNICAST",1) + assert(p.flags == 0xc0) + assert(p.getFlag("REBOOT") == 1) + assert(p.getFlag("UNICAST") == 1) + + # non-existing Flag + assert(p.getFlag('NON_EXISTING_FLAG') == None) + +def test_0b_SD_GetSomeipPacket(): + p_sd = sd.SD() + + sd_len = binascii.hexlify(bytes(p_sd)) + + p_someip = p_sd.getSomeip() + assert(len(binascii.hexlify(bytes(p_someip)))/2, someip.SOMEIP._OVERALL_LEN_NOPAYLOAD) + + p = p_sd.getSomeip(stacked=True) + assert(len(binascii.hexlify(bytes(p)))/2, someip.SOMEIP._OVERALL_LEN_NOPAYLOAD + 12) + + +def test_0c_SD(): + p = sd.SD() + + # length of package without entries nor options + assert(len(binascii.hexlify(bytes(p)))/2 == 12) + + # some Entries to array and size check + p.setEntryArray([sd.SDEntry_Service(),sd.SDEntry_EventGroup()]) + assert(struct.unpack("!L",bytes(p)[4:8])[0] == 32) + # make sure individual entry added as list + p.setEntryArray(sd.SDEntry_Service()) + assert(isinstance(p.entry_array,list)) + assert(len(p.entry_array) == 1) + # empty entry array + p.setEntryArray([]) + assert(struct.unpack("!L",bytes(p)[4:8])[0] == 0) + + + # some Options to array and size check + p.setOptionArray([sd.SDOption_IP4_EndPoint(),sd.SDOption_IP4_EndPoint()]) + assert(struct.unpack("!L",bytes(p)[8:12])[0] == 24) + # make sure individual option added as list + p.setOptionArray(sd.SDOption_IP4_EndPoint()) + assert(isinstance(p.option_array,list)) + assert(len(p.option_array) == 1) + # empty option array + p.setOptionArray([]) + assert(struct.unpack("!L",bytes(p)[8:12])[0] == 0) + + # some Entries&Options to array and size check + p.setEntryArray([sd.SDEntry_Service(),sd.SDEntry_EventGroup()]) + p.setOptionArray([sd.SDOption_IP4_EndPoint(),sd.SDOption_IP4_EndPoint()]) + assert(struct.unpack("!L",bytes(p)[4:8])[0] == 32) + assert(struct.unpack("!L",bytes(p)[40:44])[0] == 24) + +class _SDOption_IP4_EndPoint_defaults(sd._SDOption_IP4): + name = "IP4 Endpoint Option (UT)" + _defaults = {'non_existing_key':'does_not_matter_value'} +def test_0d_defaults(): + p = _SDOption_IP4_EndPoint_defaults() diff --git a/tests/test_someip.py b/tests/test_someip.py new file mode 100644 index 0000000..492b95e --- /dev/null +++ b/tests/test_someip.py @@ -0,0 +1,121 @@ +import binascii +import struct +import codecs +import sys +import os +import pytest + +from eth_scapy_someip import eth_scapy_someip as someip + +HERE = os.path.dirname(os.path.realpath(__file__)) + +def test_00_MessageId(): + """ test MessageId subpackage.""" + p = someip._SOMEIP_MessageId() + p.srv_id = 0x1111 + p.method_id = 0x0222 + p.event_id = 0x0333 + + # MessageId with 'method_id' + p.sub_id = 0 + # service id (!H : bigendian unsigned short) + assert(struct.unpack("!H", bytes(p)[:2])[0] == 0x1111) + # make sure sub_id == 0 (!B : bigendian unsigned char) + assert((struct.unpack("!B", bytes(p)[2:3])[0] & 0x80) == 0x00) + # method id (!H : bigendian unsigned short) + assert((struct.unpack("!H", bytes(p)[2:4])[0] & ~0x8000) == 0x0222) + # overall subpackage contents + assert(binascii.hexlify(bytes(p)) == b'11110222') + + # MessageId with 'event_id' + p.sub_id = 1 + # service id (!H : bigendian unsigned short) + assert(struct.unpack("!H", bytes(p)[:2])[0] == 0x1111) + # make sure sub_id == 1 (!B : bigendian unsigned char) + assert((struct.unpack("!B", bytes(p)[2:3])[0] & 0x80) == 0x80) + # event id (!H : bigendian unsigned short) + assert((struct.unpack("!H", bytes(p)[2:4])[0] & ~0x8000) == 0x0333) + # overall subpackage contents + assert(binascii.hexlify(bytes(p)) == b'11118333') + +def test_01_RequestId(): + """ test RequestId subpackage.""" + p = someip._SOMEIP_RequestId() + p.client_id = 0x1111 + p.session_id = 0x2222 + + # ClientID + assert(struct.unpack("!H", bytes(p)[:2])[0] == 0x1111) + # SessionID + assert(struct.unpack("!H", bytes(p)[2:4])[0] == 0x2222) + # overall subpackage contents + assert(binascii.hexlify(bytes(p)) == b'11112222') + +def test_02_SOMEIP(): + """ test SOMEIP packet : overall, payload and length.""" + someip_p = someip.SOMEIP() + + # overall package (with default values) + pstr = binascii.hexlify(bytes(someip_p)) + binstr = b'00'*4 + b'00'*3 + b'08' + b'00'*4 + b'01010000' + assert(pstr == binstr) + + # add payload and check length + p = someip_p / binascii.unhexlify("DEADBEEF") + pstr = binascii.hexlify(bytes(p)) + binstr = b'00'*4 + b'00'*3 + b'0c' + b'00'*4 + b'01010000' + b'deadbeef' + assert(pstr == binstr) + + # empty payload, recheck dynamic length calculation + p.remove_payload() + pstr = binascii.hexlify(bytes(p)) + binstr = b'00'*4 + b'00'*3 + b'08' + b'00'*4 + b'01010000' + assert(pstr == binstr) + +def test_03_SOMEIP_SubPackages(): + """ test SOMEIP packet : MessageId and RequestId subpackages.""" + p = someip.SOMEIP() + + # MessageId subpackage + p.msg_id.srv_id = 0x1111 + p.msg_id.method_id = 0x0222 + p.msg_id.event_id = 0x0333 + + p.msg_id.sub_id = 0 + pstr = binascii.hexlify(bytes(p)) + binstr = b'11110222' + b'00'*3 + b'08' + b'00'*4 + b'01010000' + assert(pstr == binstr) + + p.msg_id.sub_id = 1 + pstr = binascii.hexlify(bytes(p)) + binstr = b'11118333' + b'00'*3 + b'08' + b'00'*4 + b'01010000' + assert(pstr == binstr) + + # RequestId subpackage + del (p) + p = someip.SOMEIP() + p.req_id.client_id = 0x1111 + p.req_id.session_id = 0x2222 + + pstr = binascii.hexlify(bytes(p)) + binstr = b'00'*4 + b'00'*3 + b'08' + b'11112222' + b'01010000' + assert(pstr == binstr) + +def test_04_SOMEIP_Fields(): + """ test SOMEIP packet : defaults.""" + p = someip.SOMEIP() + + # default values + assert(p.proto_ver == someip.SOMEIP.PROTOCOL_VERSION) + assert(p.iface_ver == someip.SOMEIP.INTERFACE_VERSION) + assert(p.msg_type == someip.SOMEIP.TYPE_REQUEST) + assert(p.retcode == someip.SOMEIP.RET_E_OK) + +def test_05_SOMEIP_TP(): + hex_stream = b'07d000640000040c000100030101220000001801' + b'0'*2048 + + someip_tp_p = someip.SOMEIP(codecs.decode(hex_stream,'hex_codec')) + assert(someip_tp_p.msg_type in someip.SOMEIP.SOMEIP_TP_TYPES) + assert(someip_tp_p.offset == 384) + assert(someip_tp_p.reserved == 0) + assert(someip_tp_p.more_segments == someip.SOMEIP.SOMEIP_TP_MORE_SEGMENTS) \ No newline at end of file diff --git a/tests/test_someip_sd.py b/tests/test_someip_sd.py new file mode 100644 index 0000000..cd968c2 --- /dev/null +++ b/tests/test_someip_sd.py @@ -0,0 +1,29 @@ +import binascii +import struct +import sys +import os +import pytest + +from eth_scapy_someip import eth_scapy_someip as someip +from eth_scapy_someip import eth_scapy_sd as sd + +HERE = os.path.dirname(os.path.realpath(__file__)) + +def test_00_SOMEIPSD(): + p_sd = sd.SD() + p_someip = p_sd.getSomeip() + + # check SOME/IP-SD defaults + assert(binascii.hexlify(bytes(p_someip.msg_id)) == b'ffff8100') + assert(p_someip.msg_type == someip.SOMEIP.TYPE_NOTIFICATION) + + # length of SOME/IP-SD without entries nor options + p = p_someip/p_sd + assert(struct.unpack("!L",bytes(p)[4:8])[0] == 20) + + # check SOME/IP-SD lengths (note : lengths calculated on package construction) + del(p) + p_sd.setEntryArray([sd.SDEntry_Service()]) + p_sd.setOptionArray([sd.SDOption_IP4_EndPoint()]) + p = p_someip/p_sd + assert(struct.unpack("!L",bytes(p)[4:8])[0] == 48) \ No newline at end of file diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..1552168 --- /dev/null +++ b/tox.ini @@ -0,0 +1,36 @@ +[tox] +envlist = clean,py27,py38,py{27,38}-examples + +[testenv] +whitelist_externals = + sudo +deps = + -rrequirements.txt +commands = + pytest -s -o log_cli=true \ + --cov=eth_scapy_someip --cov-config=tox.ini \ + --cov-report=html \ + --cov-report=term + +# TODO : improve this 'section duplication' to execute tests with several interpreters (py27,py38...) +[testenv:py27-examples] +deps = + -rrequirements.txt +commands = + sudo {envpython} ./examples/examples_main.py +[testenv:py38-examples] +deps = + -rrequirements.txt +commands = + sudo {envpython} ./examples/examples_main.py + +[testenv:clean] +commands = + coverage erase + +# COVERAGE options +[coverage:run] +parallel = True +[coverage:html] +directory = tests/reports/htmlcov +