Python模拟发送SOME/IP消息(Scapy模块)
Python模拟服务端或客户端发送SOME/IP消息
1 说明
主要介绍如何使用Python+scapy模块进行SOME/IP以及SOME/IP-SD消息的发送和解析;关于SOME/IP-SD协议的介绍可以查看我的另一篇文章:SOME/IP服务发现协议说明-SD,以及关于SOME/IP协议的介绍可以查看文章:SOME/IP协议说明
2 依赖
3 服务端
3.1 导包
from scapy.all import *
from scapy.contrib.automotive.someip import *
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, TCP
import time
注意:在scapy的模块 scapy.all
中导入了一个交互式的模块 from scapy.autorun import *
,该模块会导致无法打包;如果你需要使用Pyinstaller或其他工具将编写好的脚本打包成EXE之后再执行的话,需要自己处理一下该模块的代码,或者可以直接屏蔽(# from scapy.autorun import *
)该导入(如果不需要使用交互式功能的话);
3.2 参数定义
定义协议需要使用的一些参数,如网卡名称、MAC地址、IP地址、端口号等;
# 注意以下参数需要根据自己的实际情况配置,否则可能无法通信成功
NETWORK_ADAPTER = "Intel(R) Wi-Fi 6 AX201 160MHz" # 定义要使用的网卡
LOCAL_MAC = "01:02:03:04:05:06" # 定义网卡对应的MAC地址
LOCAL_IP = "127.0.0.1" # 定义网卡对应的IP地址(注意此处最好设置为固定的IP地址)
BROADCAST_MAC = "a1:a2:a3:a4:00:30" # 定义广播(多播)消息时要广播的目的MAC地址
BROADCAST_IP = "239.192.255.250" # 定义广播(多播)消息时的目的IP地址
BROADCAST_PORT = 30490 # 定义发送广播消息时使用的源端口号和目的端口号,SOME/IP-SD协议通常默认使用30490
SESSION_ID_OFFER = 1 # 定义用于发送offer时的session id
3.3 提供服务(Offer Service)
模拟一个ECU的服务端,向外部(其他ECU或客户端或服务)广播发送Offer(提供的服务均使用UDP通信时),用于提供可订阅的服务;
from scapy.all import *
from scapy.contrib.automotive.someip import *
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, TCP
import time
# 注意以下参数需要根据自己的实际情况配置,否则可能无法通信成功
NETWORK_ADAPTER = "Intel(R) Wi-Fi 6 AX201 160MHz" # 定义要使用的网卡
LOCAL_MAC = "01:02:03:04:05:06" # 定义网卡对应的MAC地址
LOCAL_IP = "127.0.0.1" # 定义网卡对应的IP地址(注意此处最好设置为固定的IP地址)
BROADCAST_MAC = "a1:a2:a3:a4:00:30" # 定义广播(多播)消息时要广播的目的MAC地址
BROADCAST_IP = "239.192.255.250" # 定义广播(多播)消息时的目的IP地址
BROADCAST_PORT = 30490 # 定义发送广播消息时使用的源端口号和目的端口号,SOME/IP-SD协议通常默认使用30490
SESSION_ID_OFFER = 1 # 定义用于发送offer时的session id
# 构建一个SOME/IP头,构建的原则以及参数设置请参考文档开头引用的另外两个文章
def some_ip_header():
some_ip = SOMEIP()
some_ip.srv_id = 0xffff
some_ip.sub_id = 0x1 # 在Scapy的someip模块中,该参数为1表示需要使用类型为Event,参数为0表示需要使用类型为Method
# some_ip.method_id = 0x8100
some_ip.event_id = 0x8100
some_ip.session_id = SESSION_ID_OFFER
some_ip.msg_type = SOMEIP.TYPE_NOTIFICATION
some_ip.retcode = SOMEIP.RET_E_OK
SESSION_ID_OFFER += 1
if SESSION_ID_OFFER > 0xffff:
SESSION_ID_OFFER = 1
return some_ip
# 构建一个SOME/IP-SD头以及Entries列表和Options列表,构建的原则以及参数设置请参考文档开头引用的另外两个文章
def some_ip_sd():
# 构建Options
option1 = SDOption_IP4_EndPoint() # Option有多种类型,请根据协议规定以及业务需求选择自己需要的类型并配置
option1.addr = LOCAL_IP
option1.l4_proto = 0x06 # 0x06: TCP, 0x11: UDP
option1.port = 50001 # 该端口应该是实际业务层定义的服务端的服务端口,请按照实际情况配置
option2 = SDOption_IP4_EndPoint() # Option有多种类型,请根据协议规定以及业务需求选择自己需要的类型并配置
option2.addr = LOCAL_IP
option2.l4_proto = 0x06 # 0x06: TCP, 0x11: UDP
option2.port = 50003 # 该端口应该是实际业务层定义的服务端的服务端口,请按照实际情况配置
option3 = SDOption_IP4_EndPoint() # Option有多种类型,请根据协议规定以及业务需求选择自己需要的类型并配置
option3.addr = LOCAL_IP
option3.l4_proto = 0x11 # 0x06: TCP, 0x11: UDP
option3.port = 50003 # 该端口应该是实际业务层定义的服务端的服务端口,请按照实际情况配置
option_array = [option1, option2, option3]
# 构建Entries
entry1 = SDEntry_Service()
entry1.type = SDENTRY_TYPE_SRV_OFFERSERVICE
entry1.index_1 = 0x0 # 引用option_array中的option1,该参数是第一个要引用的位置
entry1.n_opt_1 = 0x1 # 只引用一个,即option1
entry1.index_2 = 0x0
entry1.n_opt_2 = 0x0
entry1.srv_id = 0x1020 # 该服务ID(Service ID)为具体业务中定义的,请按照实际情况配置
entry1.inst_id = 0x01 # 该实例ID(Instance ID)为具体业务中定义的,请按照实际情况配置
entry1.major_ver = 0x1
entry1.ttl = 0x3 # TTL为3表示该服务(Offer)的声明周期为3s,如果设置为0则表示停止提供服务(Stop Offer Service)
entry1.minor_ver = 0x1
entry2 = SDEntry_Service()
entry2.type = SDENTRY_TYPE_SRV_OFFERSERVICE
entry2.index_1 = 0x1 # 引用option_array中的option2和option3,该参数是第一个要引用的位置
entry2.n_opt_1 = 0x2 # 引用2个,即option2, option3
entry2.index_2 = 0x0
entry2.n_opt_2 = 0x0
entry2.srv_id = 0x3040 # 该服务ID(Service ID)为具体业务中定义的,请按照实际情况配置
entry2.inst_id = 0x01 # 该实例ID(Instance ID)为具体业务中定义的,请按照实际情况配置
entry2.major_ver = 0x1
entry2.ttl = 0x3 # TTL为3表示该服务(Offer)的声明周期为3s,如果设置为0则表示停止提供服务(Stop Offer Service)
entry2.minor_ver = 0x1
entry_array = [entry1, entry2]
# 构建SOME/IP-SD消息
sd = SD()
sd.flags = 0xC0 # 该配置涉及重启标志位和单播标志位,具体详情请查看文章开头的引用文档
sd.entry_array = entry_array[:]
sd.option_array = option_array[:]
return sd
if __name__ == "__main__":
# 以下组包即发送方式仅作为示例,实际情况中需要进行封装并可能需要开启线程进行处理
sd = some_ip_sd() # sd消息通常是固定的,因此只需要在初始的时候获取一次
for i in range(100): # 此处只循环发送100次,实际情况可能需要自己新建一个线程一直发送
some_ip = some_ip_header() # 每次发送均需要动态获取someip头,目的是保证每个someip头中的session id字段都会增加1,以符合协议规定
# 最终发出的消息需要通过分层组装的方式将不同层进行不同的设置,最终组合成一个消息发送,以下方式只适用于UDP发送,使用TCP时请不要这样组包
package = Ether(dst=BROADCAST_MAC) / IP(src=LOCAL_IP, dst=BROADCAST_IP) / UDP(sport=BROADCAST_PORT, dport=BROADCAST_PORT) / some_ip / sd
sendp(package, iface=NETWORK_ADAPTER)
time.sleep(1) # 按照协议要求,通常每个Offer的间隔为1s
3.4 订阅ACK/NACK(Subscribe ACK/NACK)
from scapy.all import *
from scapy.contrib.automotive.someip import *
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, TCP
import time
# 注意以下参数需要根据自己的实际情况配置,否则可能无法通信成功
NETWORK_ADAPTER = "Intel(R) Wi-Fi 6 AX201 160MHz" # 定义要使用的网卡
LOCAL_MAC = "01:02:03:04:05:06" # 定义网卡对应的MAC地址
LOCAL_IP = "127.0.0.1" # 定义网卡对应的IP地址(注意此处最好设置为固定的IP地址)
BROADCAST_MAC = "a1:a2:a3:a4:00:30" # 定义广播(多播)消息时要广播的目的MAC地址
BROADCAST_IP = "239.192.255.250" # 定义广播(多播)消息时的目的IP地址
BROADCAST_PORT = 30490 # 定义发送广播消息时使用的源端口号和目的端口号,SOME/IP-SD协议通常默认使用30490
SESSION_ID_OFFER = 1 # 定义用于发送offer时的session id
# 以下函数send_subscribe_ack用于返回订阅ACK或NACK的响应消息
# 参数pkg为服务端接收到的someip消息或someip-sd消息,其类型为Packet,是已经经过scapy的someip模块解析过后的数据结构,可以直接访问其中的字段(无需额外的解析过程)
def send_subscribe_ack(pkg: Packet):
src_mac = pkg.src # 收到的包的源MAC地址(即订阅者的MAC地址),如:02:03:04:05:00:101
src_ip = pkg[IP].src # 收到的包的源IP地址(即订阅者的IP地址)
some_ip = pkg[SOMEIP] # 收到的包中的SOMEIP层(该层中包含了SOME/IP-SD层)
new_sd = []
# 将收到的SD包中的entry列表中的每个事件组(EventGroup)修改为订阅响应ACK
for ent in pkg[SD].entry_array:
# 以下是需要作为ACK响应时需要修改的项,其他参数保持不变即可
ent.type = 0x07 # 0x06: Subscribe, 0x07: ACK
# 注意:订阅的ACK响应消息不需要引用option列表,因此以下4个参数直接设置0即可
ent.index_1 = 0
ent.index_2 = 0
ent.n_opt_1 = 0
ent.n_opt_2 = 0
ent.ttl = 0x0 # TTL=0时表示返回的是NACK,如果需要返回的是ACK则不要设置此项
new_sd.append(ent)
some_ip.len = int(some_ip.len) - int(some_ip[SD].len_option_array) # SOME/IP的长度需要减去option列表的长度,因为响应不需要附带option列表
some_ip[SD].entry_array = new_sd # 用新组建的entry列表替换原来的列表
# 将option列表清空,同时长度设置为0
some_ip[SD].len_option_array = 0
some_ip[SD].option_array = []
# 以下的some_ip中实际包含了一个SOME/IP层和一个SOME/IP-SD层
package = Ether(dst=src_mac) / IP(src=LOCAL_IP, dst=src_ip) / UDP(sport=BROADCAST_PORT, dport=BROADCAST_PORT) / some_ip
sendp(package, iface=NETWORK_ADAPTER)
# 该函数主要用于过滤收到的订阅消息并进行响应,其余消息则需要忽略
def filter_subscribe(pkg: Packet):
# 对于非UDP也不是TCP的数据包直接忽略
if UDP not in pkg and TCP not in pkg:
return
if SD in pkg: # pkg包中有SOME/IP-SD协议层,表示是一个服务发现的消息(可能是Offer, Stop Offer, Subscribe, Stop Subscribe, Subscribe ACK等)
for ent in pkg[SD].entry_array:
if int(ent.type) == 0x06: # 收到的是订阅消息或停止订阅消息
if int(ent.ttl) > 0: # 收到的是订阅消息
send_subscribe_ack(pkg)
return
else: # 收到的是停止订阅消息,暂不做处理
pass
# 以下函数receive用于接收网络上的所有消息,并按照目的IP地址过滤(过滤出目的IP地址是服务端地址:LOCAL_IP 或是广播地址:BROADCAST_IP 的消息)
# 注意,通常该函数中的sniff捕获函数需要在单独的线程里面,不然可能影响其他消息的收发
def receive():
# filter参数:用于指定过滤规则
# iface参数:指定监听的网卡
# prn参数:用于指定每一个过滤之后的数据包需要用哪个函数来处理,过滤之后的包会作为参数传递给该指定的函数
# count参数:设置要捕获的包的个数,0表示一直捕获,这种情况下最好使用线程处理
sniff(filter=f'dst host {LOCAL_IP} or dst host {BROADCAST_IP}', iface=NETWORK_ADAPTER, prn=filter_subscribe, count=0)
if __name__ == "__main__":
receive()
3.5 通知(Notifier)
from scapy.all import *
from scapy.contrib.automotive.someip import *
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, TCP
import time
# 注意以下参数需要根据自己的实际情况配置,否则可能无法通信成功
NETWORK_ADAPTER = "Intel(R) Wi-Fi 6 AX201 160MHz" # 定义要使用的网卡
LOCAL_MAC = "01:02:03:04:05:06" # 定义网卡对应的MAC地址
LOCAL_IP = "127.0.0.1" # 定义网卡对应的IP地址(注意此处最好设置为固定的IP地址)
BROADCAST_MAC = "a1:a2:a3:a4:00:30" # 定义广播(多播)消息时要广播的目的MAC地址
BROADCAST_IP = "239.192.255.250" # 定义广播(多播)消息时的目的IP地址
BROADCAST_PORT = 30490 # 定义发送广播消息时使用的源端口号和目的端口号,SOME/IP-SD协议通常默认使用30490
SESSION_ID_OFFER = 1 # 定义用于发送offer时的session id
# 使用一个SUBSCRIBED参数记录订阅成功的事件组的信息,以及订阅者的MAC地址、IP地址、端口号(服务端端口号以及客户端端口号)等;注意
# 该信息应该在【3.4】章节中订阅成功返回ACK的同时进行记录,且每次收到订阅后要更新记录,如果超过TTL的时间后没有收到订阅则应该清除该
# 客户端的订阅状态以及订阅信息,以下字典中记录了信息即表示订阅成功状态,超时未订阅则删除对应信息,表示未订阅状态;实际情况中大家也
# 可以根据具体情况增加标志位用于记录,或者使用其他任何形式记录,不一定是字典
# 同时需要注意,通常一个服务端(ECU)会提供多个服务(Service ID),每个服务对应一个或多个事件组(EventGroup),每个事件组对应一个或
# 多个方法(Method ID,或者也可以成为Element ID),在订阅时,通常只订阅事件组,而不会直接订阅方法
# 以下仅为示例,实际需要根据业务需求确定具体的ID以及对应关系
SUBSCRIBED = {
"0x0102": {
"mac": "01:02:03:04:05:06",
"ip": "193.128.1.1",
"src_port": 50010, # 源端口号为服务端发送需要的端口号,具体业务中需要自己定义
"dst_port": 50011, # 目的地端口号为客户端端口号,具体业务中需要自己定义
"event_group": {
"0x5001": {
"0x0001": 1, # 每个已订阅的事件组的方法需要独立记录一个Session ID的值,从初始值1开始
"0x0002": 1,
"0x0003": 1
},
"0x5002": {
"0x0004": 1
}
}
},
"0x0304": {
"mac": "a1:b2:c3:d4:e5:f6",
"ip": "193.128.1.2",
"src_port": 50012, # 源端口号为服务端发送需要的端口号,具体业务中需要自己定义
"dst_port": 50013, # 目的地端口号为客户端端口号,具体业务中需要自己定义
"event_group": {
"0x5010": {
"0x0001": 1, # 每个已订阅的事件组的方法需要独立记录一个Session ID的值,从初始值1开始
"0x0002": 1,
"0x0003": 1
},
"0x5011": {
"0x0004": 1,
"0x0005": 1
}
}
}
}
def send_notifier():
for srv_id, srv in SUBSCRIBED.items():
dst_mac = srv.get("mac")
dst_ip = srv.get("ip")
src_port = srv.get("src_port")
dst_port = srv.get("dst_port")
for group_id, group in srv["event_group"].items():
for method_id, session_id in group[group_id].items():
# 开始针对每个方法(Method或Element)创建通知消息(Notifier)
notifier = SOMEIP()
notifier.srv_id = int(srv_id, 16)
notifier.sub_id = 0x1 # 在Scapy的someip模块中,该参数为1表示需要使用类型为Event,参数为0表示需要使用类型为Method
# some_ip.method_id = 0x8100
notifier.event_id = int(method_id, 16)
notifier.client_id = 0x00
notifier.session_id = session_id
notifier.msg_type = SOMEIP.TYPE_NOTIFICATION
notifier.retcode = SOMEIP.RET_E_OK
# 发送notifier的时候需要附加payload信息,即携带一些数据发送给订阅者;这些数据的具体定义也是根据实际业务定义的,常见的类型可以参考文档开头
# 的文档链接;如可能是一个int类型的枚举值(int类型需要先转换成十六进制字符串),或是一个结构体,或是字符串等
notifier.add_payload(bytes.fromhex("01a2b3"))
package = Ether(dst=dst_mac) / IP(src=LOCAL_IP, dst=dst_ip) / UDP(sport=src_port, dport=dst_port) / notifier
sendp(package, iface=NETWORK_ADAPTER)
# 更新该方法(Method)的session id的值,并更新到字典中,方便下次使用时是最新的值
session_id += 1
if session_id > 0xffff:
session_id = 1
SUBSCRIBED[srv_id][event_group][group_id][method_id] = session_id
if __name__ == "__main__":
# 本例子中只调用了一次函数将所有订阅过的事件组(EventGroup)通知(Notifier)发送一次,但实际情况可能是要周期性发送,那就需要通过线程、协程等来
# 管理通知的发送
send_notifier()
3.6 请求/响应ACK(Response ACK)
from scapy.all import *
from scapy.contrib.automotive.someip import *
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, TCP
import time
import socket
# 注意以下参数需要根据自己的实际情况配置,否则可能无法通信成功
NETWORK_ADAPTER = "Intel(R) Wi-Fi 6 AX201 160MHz" # 定义要使用的网卡
LOCAL_MAC = "01:02:03:04:05:06" # 定义网卡对应的MAC地址
LOCAL_IP = "127.0.0.1" # 定义网卡对应的IP地址(注意此处最好设置为固定的IP地址)
BROADCAST_MAC = "a1:a2:a3:a4:00:30" # 定义广播(多播)消息时要广播的目的MAC地址
BROADCAST_IP = "239.192.255.250" # 定义广播(多播)消息时的目的IP地址
BROADCAST_PORT = 30490 # 定义发送广播消息时使用的源端口号和目的端口号,SOME/IP-SD协议通常默认使用30490
SESSION_ID_OFFER = 1 # 定义用于发送offer时的session id
TCP_SERVER = None # 创建的TCP连接的服务,用于监听客户端连接
TCP_CLIENT = None # 已建立连接的客户端句柄
def send_response_ack(pkg):
src_mac = pkg.src # 02:03:04:05:00:101
src_ip = pkg[IP].src
src_port = pkg[TCP].sport
dst_port = pkg[TCP].dport
# 先获取TCP的payload,在通过SOMEIP模块进行解析后访问
payload = bytes(pkg[TCP].payload)
some_ip = SOMEIP(payload)
# 复制一个收到并解析后的SOMEIP包,并直接在该包上做修改后作为响应发送给请求的客户端;也可以自己创建一个新的并直接给各个字段赋值
new_some_ip = some_ip.copy()
new_some_ip.len = 16
new_some_ip.msg_type = SOMEIP.TYPE_RESPONSE
new_some_ip.retcode = 0x00
new_some_ip.remove_payload() # 先删除原有的payload
new_some_ip.add_payload(bytes.fromhex('0000000000000000')) # 再重新添加需要发送的新的payload,其中的值根据具体业务定义,此处仅举例说明
# 使用已经创建的TCP连接发送ACK响应消息
TCP_CLIENT.send(bytes(new_some_ip))
# todo, 发送ACK响应消息后,可能需要自动发送一个关联的通知消息(Notifier),但该通知消息需要使用TCP的方式进行发送,其余部分可以参考【3.5】章节
# todo, 其中具体关联方式、发送使用TCP还是UDP等,均需要根据实际的业务约定来决定
# 该函数主要用于过滤收到的请求消息并进行响应,其余消息则需要忽略
def filter_request(pkg: Packet):
# 对于非UDP也不是TCP的数据包直接忽略
if UDP not in pkg and TCP not in pkg:
return
if SD in pkg: # pkg包中有SOME/IP-SD协议层,暂不处理
return # 此处应该增加订阅处理逻辑,参考【3.4】章节,订阅成功后才能继续后续的业务请求(Request)
if TCP in pkg: # pkg包是TCP协议的数据
# 获取TCP的payload,UDP协议的包可以直接组包然后发送,接收时也会自动解析,可以直接通过分层访问并获取字段值;而TCP的包在发送时必须
# 将SOMEIP包转换成字节码,在作为TCP的payload发送,接收时也同样需要先获取TCP的payload,在通过SOMEIP模块进行解析后访问
send_response_ack(pkg)
return
if UDP in pkg: # pkg收到的请求也可能是UDP协议的,这部分处理与TCP类似,只是在组包与发包时不同,组包与发包参考上述章节
return
# 以下函数receive用于接收网络上的所有消息,并按照目的IP地址过滤(过滤出目的IP地址是服务端地址:LOCAL_IP 或是广播地址:BROADCAST_IP 的消息)
# 注意,通常该函数中的sniff捕获函数需要在单独的线程里面,不然可能影响其他消息的收发
def receive():
# filter参数:用于指定过滤规则
# iface参数:指定监听的网卡
# prn参数:用于指定每一个过滤之后的数据包需要用哪个函数来处理,过滤之后的包会作为参数传递给该指定的函数
# count参数:设置要捕获的包的个数,0表示一直捕获,这种情况下最好使用线程处理
sniff(filter=f'dst host {LOCAL_IP} or dst host {BROADCAST_IP}', iface=NETWORK_ADAPTER, prn=filter_request, count=0)
if __name__ == "__main__":
# todo, 第一步:首先需要广播发送Offer,建议可以创建一个线程专门发送Offer
# todo, 第二步:建立TCP的服务并开始监听,等待客户端连接;实际情况中可能需要为多个服务(Service)的不同端口创建多个TCP服务,因此实际情况中也通常建议使用单独的线程创建多个TCP服务并监听,此处示例暂时只使用一个TCP
TCP_SERVER = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
TCP_SERVER.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
TCP_SERVER.bind((LOCAL_IP, 50001)) # 此处的端口是根据业务配置的,请设置为自己的值
TCP_SERVER.listen(1)
# 等待客户端连接
TCP_CLIENT, address = TCP_SERVER.accept()
# 开始接收消息
receive()
作者:李星星BruceL