Initial commit

This commit is contained in:
zaksabeast 2022-01-07 22:33:24 -06:00
commit 5b64c07277
8 changed files with 4810 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
# Ghidra py.class
*.class

181
Label3dsCTRFns.py Normal file
View File

@ -0,0 +1,181 @@
# Labels 3ds CTR lib functions
# @category 3ds
#
from collections import Counter
from ghidra_utils import bytesToByteArr, nameCtrFunc, findFirstByteArray, readBytes, getCallArgs, getValueFromConstantVarnode, getOrCreateNamespace
from ctr_services import getCommandName, service_handle_names
listing = currentProgram.getListing()
memory = currentProgram.getMemory()
def makeIpcHeader(command_id, normal_params, translate_params):
return command_id << 16 | (normal_params & 0x3f) << 6 | translate_params & 0x3f
def parseIpcVarNode(varnode):
addr = varnode.getAddress()
if addr.isRegisterAddress():
return None
if addr.isConstantAddress():
return addr.getOffset()
return memory.getInt(addr)
def nameIpcFunc(ipc_func, ipc_header, handle):
func_name = getCommandName(ipc_header, handle)
nameCtrFunc(ipc_func, func_name)
def nameIpcWrapperFunc(ipc_func, ipc_header, handle):
func_name = getCommandName(ipc_header, handle)
nameCtrFunc(ipc_func, func_name + '_wrapper')
def getConstantFromMov(mov_addr):
inst = listing.getInstructionContaining(mov_addr)
if inst is not None:
pcode_ops = inst.getPcode()
for pcode_op in pcode_ops:
if pcode_op.getMnemonic() == 'COPY':
varnode = pcode_op.getInputs()[0]
return getValueFromConstantVarnode(varnode)
return None
def getIpcHeaderFromAddr(addr):
ipc_header = getConstantFromMov(addr)
if ipc_header is None:
ipc_store_refs = getReferencesFrom(addr)
if len(ipc_store_refs) > 0:
ipc_header_addr = ipc_store_refs[0].getToAddress()
ipc_header = memory.getInt(ipc_header_addr)
return ipc_header
ctr_namespace = getOrCreateNamespace('ctr')
handles_by_caller_funcs = {}
for handle_name in service_handle_names:
handles = getSymbols(handle_name, ctr_namespace)
for handle in handles:
refs = getReferencesTo(handle.getAddress())
for ref in refs:
handle_name = handle.getName()
from_addr = ref.getFromAddress()
func_ref = getFunctionContaining(from_addr)
if func_ref is not None:
func_addr = func_ref.getEntryPoint()
handles_by_caller_funcs[func_addr] = handle_name
def getHandleOfIpcCommand(ipc_func):
return handles_by_caller_funcs.get(ipc_func.getEntryPoint())
# ---------------------------------------------------------------------------
# IPC functions using ipc_set_header
ipc_set_header_bytes = [0x10, 0xb5, 0x04, 0x46, 0x08, 0x46, 0x11, 0x46, 0x1a, 0x46, 0x02, 0x9b, 0xff, 0xff, 0xff, 0xff, 0x21, 0x68, 0x08, 0x60, 0x10, 0xbd]
ipc_set_header_mask = [0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff]
ipc_set_header_addr = findFirstByteArray(ipc_set_header_bytes, ipc_set_header_mask)
if ipc_set_header_addr is None:
ipc_make_header_refs = []
else:
ipc_make_header_refs = getReferencesTo(ipc_set_header_addr)
if ipc_make_header_refs is None:
ipc_make_header_refs = []
result = []
for ref in ipc_make_header_refs:
func = getFunctionContaining(ref.getFromAddress())
if func is not None:
result.append(func)
else:
print('Cannot find function for {}. Could this be part of a service command handler jump table?'.format(ref.getFromAddress()))
ipc_make_header_refs = result
dupliate_ipc_make_header_refs = []
unique_ipc_make_header_refs = []
for ref, count in Counter(ipc_make_header_refs).items():
if count > 1:
dupliate_ipc_make_header_refs.append(ref)
else:
unique_ipc_make_header_refs.append(ref)
# If a function sets a header multiple times, it's likely a command handler
for ipc_func in dupliate_ipc_make_header_refs:
nameCtrFunc(ipc_func, 'handleServiceCommand')
for ipc_func in unique_ipc_make_header_refs:
args = getCallArgs(ipc_func, ipc_set_header_addr)
if len(args) < 4:
print('Bad number of args at {}'.format(addr))
else:
command_id = parseIpcVarNode(args[1])
normal_params = parseIpcVarNode(args[2])
translate_params = parseIpcVarNode(args[3])
if command_id is not None and normal_params is not None and translate_params is not None:
ipc_header = makeIpcHeader(command_id, normal_params, translate_params)
handle = getHandleOfIpcCommand(ipc_func)
nameIpcFunc(ipc_func, ipc_header, handle)
# ---------------------------------------------------------------------------
# IPC functions using inlined svc_sync_sync_request
svc_send_sync_request = bytesToByteArr([0x32, 0x00, 0x00, 0xef])
svc_send_sync_request_mask = bytesToByteArr([0xff, 0xff, 0xff, 0xff])
func_start_bytes = bytesToByteArr([0x00, 0x00, 0x0d, 0xe9])
func_start_mask = bytesToByteArr([0x00, 0x00, 0x0f, 0xff])
get_thread_local_storage_bytes = bytesToByteArr([0x70, 0x4f, 0x1d, 0xee])
get_thread_local_storage_mask = bytesToByteArr([0xff, 0x0f, 0xff, 0xff])
start_address = memory.getExecuteSet().getMaxAddress()
while start_address is not None:
svc_send_sync_request_addr = memory.findBytes(start_address, svc_send_sync_request, svc_send_sync_request_mask, False, monitor)
if svc_send_sync_request_addr is None:
# No more functions
break
if readBytes(svc_send_sync_request_addr.add(4), 4) == bytesToByteArr([0x1e, 0xff, 0x2f, 0xe1]):
# Non-inlined svcSendSyncRequest
start_address = None
continue
ipc_func_addr = memory.findBytes(svc_send_sync_request_addr, func_start_bytes, func_start_mask, False, monitor)
if ipc_func_addr is None:
# No more functions
break
wrapper_func = None
ipc_func = getFunctionContaining(ipc_func_addr)
if ipc_func is None:
# Bad data
# We should probably check the section before assuming this
break
if not ipc_func.getEntryPoint().equals(ipc_func_addr):
wrapper_func = ipc_func
ipc_func = createFunction(ipc_func_addr, 'UNKNOWN_CTR_IPC_FN')
ipc_func_body = ipc_func.getBody()
ipc_func_start = ipc_func_body.getMinAddress()
ipc_func_end = ipc_func_body.getMaxAddress()
get_thread_local_storage_addr = memory.findBytes(ipc_func_start, ipc_func_end, get_thread_local_storage_bytes, get_thread_local_storage_mask, True, monitor)
ipc_store_header_addr = get_thread_local_storage_addr.add(4)
ipc_header = None
while ipc_header is None and ipc_store_header_addr < svc_send_sync_request_addr:
ipc_header = getIpcHeaderFromAddr(ipc_store_header_addr)
ipc_store_header_addr = ipc_store_header_addr.add(4)
if ipc_header is None:
print('Cannot find ipc header for {}!'.format(ipc_func_addr))
continue
handle = getHandleOfIpcCommand(ipc_func)
nameIpcFunc(ipc_func, ipc_header, handle)
if wrapper_func is not None:
nameIpcWrapperFunc(wrapper_func, ipc_header, handle)
start_address = ipc_func_addr.subtract(4)

48
Label3dsHandles.py Normal file
View File

@ -0,0 +1,48 @@
# Labels 3ds CTR lib handles
# @category 3ds
#
from ghidra.program.model.symbol import SourceType
from ghidra_utils import getOrCreateNamespace, getCallArgsFromRef
from ctr_services import getServiceHandleName, getServiceCallerRefs
memory = currentProgram.getMemory()
# Find service refs
service_caller_refs = getServiceCallerRefs()
# Find service handles
ctr_namespace = getOrCreateNamespace('ctr')
srv_namespace = getOrCreateNamespace('srv', ctr_namespace)
possible_get_service_handle_direct_symbols = getSymbols('GetServiceHandleDirect', srv_namespace)
if len(possible_get_service_handle_direct_symbols) == 0:
raise Exception('Cannot find ctr::srv::GetServiceHandleDirect. Please find and label it!')
if len(possible_get_service_handle_direct_symbols) > 1:
raise Exception('More than one ctr::srv::GetServiceHandleDirect symbols. This script can only run if one exists!')
get_service_handle_direct = getSymbols('GetServiceHandleDirect', srv_namespace)[0]
get_handle_refs = getReferencesTo(get_service_handle_direct.getAddress())
service_handles = {}
for get_handle_ref in get_handle_refs:
addr = get_handle_ref.getFromAddress()
args = getCallArgsFromRef(get_handle_ref)
handle_ref = args[0]
service = args[1]
if handle_ref.isAddress():
handle_addr = memory.getInt(handle_ref.getAddress())
handle = toAddr(handle_addr)
caller_func = getFunctionContaining(addr)
if caller_func is not None:
caller_addr = caller_func.getEntryPoint()
service = service_caller_refs[caller_addr]
service_handles[service] = handle
label = getServiceHandleName(service)
createLabel(handle, label, ctr_namespace, True, SourceType.USER_DEFINED)
print('Created handle {} at {}'.format(label, handle))
else:
# TODO: This might be improved with pcode
print('Unable to detect handle name at {}'.format(addr))

173
Label3dsSVCs.py Normal file
View File

@ -0,0 +1,173 @@
# Bookmarks 3ds SVCs and either labels the function or sets a comment
# @category 3ds
#
from ghidra_utils import bytesToByteArr, nameCtrFunc
listing = currentProgram.getListing()
memory = currentProgram.getMemory()
svc_bytes = bytesToByteArr([0x00, 0x00, 0x00, 0xef])
svc_bytes_mask = bytesToByteArr([0x00, 0xff, 0xff, 0xff])
ctr_svcs = {
0x01: 'svcControlMemory',
0x02: 'svcQueryMemory',
0x03: 'svcExitProcess',
0x04: 'svcGetProcessAffinityMask',
0x05: 'svcSetProcessAffinityMask',
0x06: 'svcGetProcessIdealProcessor',
0x07: 'svcSetProcessIdealProcessor',
0x08: 'svcCreateThread',
0x09: 'svcExitThread',
0x0A: 'svcSleepThread',
0x0B: 'svcGetThreadPriority',
0x0C: 'svcSetThreadPriority',
0x0D: 'svcGetThreadAffinityMask',
0x0E: 'svcSetThreadAffinityMask',
0x0F: 'svcGetThreadIdealProcessor',
0x10: 'svcSetThreadIdealProcessor',
0x11: 'svcGetCurrentProcessorNumber',
0x12: 'svcRun',
0x13: 'svcCreateMutex',
0x14: 'svcReleaseMutex',
0x15: 'svcCreateSemaphore',
0x16: 'svcReleaseSemaphore',
0x17: 'svcCreateEvent',
0x18: 'svcSignalEvent',
0x19: 'svcClearEvent',
0x1A: 'svcCreateTimer',
0x1B: 'svcSetTimer',
0x1C: 'svcCancelTimer',
0x1D: 'svcClearTimer',
0x1E: 'svcCreateMemoryBlock',
0x1F: 'svcMapMemoryBlock',
0x20: 'svcUnmapMemoryBlock',
0x21: 'svcCreateAddressArbiter',
0x22: 'svcArbitrateAddress',
0x23: 'svcCloseHandle',
0x24: 'svcWaitSynchronization1',
0x25: 'svcWaitSynchronizationN',
0x26: 'svcSignalAndWait',
0x27: 'svcDuplicateHandle',
0x28: 'svcGetSystemTick',
0x29: 'svcGetHandleInfo',
0x2A: 'svcGetSystemInfo',
0x2B: 'svcGetProcessInfo',
0x2C: 'svcGetThreadInfo',
0x2D: 'svcConnectToPort',
0x2E: 'svcSendSyncRequest1',
0x2F: 'svcSendSyncRequest2',
0x30: 'svcSendSyncRequest3',
0x31: 'svcSendSyncRequest4',
0x32: 'svcSendSyncRequest',
0x33: 'svcOpenProcess',
0x34: 'svcOpenThread',
0x35: 'svcGetProcessId',
0x36: 'svcGetProcessIdOfThread',
0x37: 'svcGetThreadId',
0x38: 'svcGetResourceLimit',
0x39: 'svcGetResourceLimitLimitValues',
0x3A: 'svcGetResourceLimitCurrentValues',
0x3B: 'svcGetThreadContext',
0x3C: 'svcBreak',
0x3D: 'svcOutputDebugString',
0x3E: 'svcControlPerformanceCounter',
0x47: 'svcCreatePort',
0x48: 'svcCreateSessionToPort',
0x49: 'svcCreateSession',
0x4A: 'svcAcceptSession',
0x4B: 'svcReplyAndReceive1',
0x4C: 'svcReplyAndReceive2',
0x4D: 'svcReplyAndReceive3',
0x4E: 'svcReplyAndReceive4',
0x4F: 'svcReplyAndReceive',
0x50: 'svcBindInterrupt',
0x51: 'svcUnbindInterrupt',
0x52: 'svcInvalidateProcessDataCache',
0x53: 'svcStoreProcessDataCache',
0x54: 'svcFlushProcessDataCache',
0x55: 'svcStartInterProcessDma',
0x56: 'svcStopDma',
0x57: 'svcGetDmaState',
0x58: 'svcRestartDma',
0x59: 'svcSetGpuProt',
0x5A: 'svcSetWifiEnabled',
0x60: 'svcDebugActiveProcess',
0x61: 'svcBreakDebugProcess',
0x62: 'svcTerminateDebugProcess',
0x63: 'svcGetProcessDebugEvent',
0x64: 'svcContinueDebugEvent',
0x65: 'svcGetProcessList',
0x66: 'svcGetThreadList',
0x67: 'svcGetDebugThreadContext',
0x68: 'svcSetDebugThreadContext',
0x69: 'svcQueryDebugProcessMemory',
0x6A: 'svcReadProcessMemory',
0x6B: 'svcWriteProcessMemory',
0x6C: 'svcSetHardwareBreakPoint',
0x6D: 'svcGetDebugThreadParam',
0x70: 'svcControlProcessMemory',
0x71: 'svcMapProcessMemory',
0x72: 'svcUnmapProcessMemory',
0x73: 'svcCreateCodeSet',
0x74: 'svcRandomStub',
0x75: 'svcCreateProcess',
0x76: 'svcTerminateProcess',
0x77: 'svcSetProcessResourceLimits',
0x78: 'svcCreateResourceLimit',
0x79: 'svcSetResourceLimitValues',
0x7A: 'svcAddCodeSegment',
0x7B: 'svcBackdoor',
0x7C: 'svcKernelSetState',
0x7D: 'svcQueryProcessMemory',
0xFF: 'svcStopPoint',
}
start_address = memory.getExecuteSet().getMaxAddress()
while True:
svc_instruction_addr = memory.findBytes(start_address, svc_bytes, svc_bytes_mask, False, monitor)
if svc_instruction_addr is None:
break
svc_instruction_code = listing.getCodeUnitAt(svc_instruction_addr)
if svc_instruction_code is not None:
svc_id = svc_instruction_code.getBytes()[0]
svc_name = ctr_svcs.get(svc_id)
if svc_name is None:
# Could be data that was misinterpretted
start_address = svc_instruction_addr.subtract(4)
continue
# Show user current location in case we need to about inlining
setCurrentLocation(svc_instruction_addr)
if (
svc_name == 'svcGetSystemTick' or
svc_name == 'svcClearEvent' or
svc_name == 'svcCloseHandle' or
svc_name == 'svcSignalEvent' or
svc_name == 'svcExitProcess' or
'svcSendSyncRequest' in svc_name or
'svcWaitSynchronization' in svc_name
):
rename_to_svc = False
else:
# TODO: Be smarter about finding inlinved svcs. That shouldn't be too hard to do.
rename_to_svc = askYesNo('SVC Function Namer', 'Rename the current function to be the SVC?\nClicking no will create a comment instead.')
if rename_to_svc:
func = getFunctionAt(svc_instruction_addr)
func = func if func is not None else getFunctionBefore(svc_instruction_addr)
nameCtrFunc(func, svc_name)
else:
svc_instruction_code.setComment(svc_instruction_code.PRE_COMMENT, svc_name)
print('Bookmarked {} at {}'.format(svc_name, svc_instruction_addr.toString()))
createBookmark(svc_instruction_addr, 'SVC', svc_name)
start_address = svc_instruction_addr.subtract(4)

20
README.md Normal file
View File

@ -0,0 +1,20 @@
# 3ds Ghidra Scripts
These are ghidra scripts to help with 3ds reverse engineering.
Features:
- Labels, comments (when inlined), and bookmarks svc use
- Labels service handles, given ctr::srv::GetServiceHandleDirect
- Labels IPC functions and uses handles to better identify functions
- Adds `ThreadLocalStorage` and types thread local storage
- Renames thread local storage to 'tls'
These have been built over time for my personal use as needs came up, so results may vary and improvements can be made. If you run into a situation where these don't work as intended, I would happily accept a PR.
## Credits
Thanks to:
- [3dbrew](https://www.3dbrew.org/wiki/Services_API) for almost everything in the services.py file
- HackOvert for [their ghidra snippets](https://github.com/HackOvert/GhidraSnippets)

66
Set3dsTlsType.py Normal file
View File

@ -0,0 +1,66 @@
# Set thread local storage types
# @category 3ds
#
from ghidra.program.model.data import DataTypeConflictHandler
from ghidra.app.util.cparser.C import CParser
from ghidra.program.model.symbol import SourceType
from ghidra.program.model.pcode.HighFunctionDBUtil import commitLocalNamesToDatabase
from ghidra_utils import findByteArray, forceSetVariableName, decompileContainingFunction, nameCtrFunc
tls_struct = """
struct ThreadLocalStorage {
uint storage[32];
uint command_buffer[64];
uint static_buffers[32];
};
"""
# Add the ThreadLocalStorage and PThreadLocalStorage types
data_type_manager = currentProgram.getDataTypeManager()
parser = CParser(data_type_manager)
parsed_datatype = parser.parse(tls_struct)
data_type_manager.addDataType(parsed_datatype, DataTypeConflictHandler.DEFAULT_HANDLER)
parsed_datatype = parser.parse('typedef ThreadLocalStorage *PThreadLocalStorage;')
tls_pointer_datatype = data_type_manager.addDataType(parsed_datatype, DataTypeConflictHandler.DEFAULT_HANDLER)
# Handle inlined getThreadLocalStorage
get_tls_bytes = [0x70, 0x4f, 0x1d, 0xee]
get_tls_mask = [0xff, 0x0f, 0xff, 0xff]
tls_addrs = findByteArray(get_tls_bytes, get_tls_mask)
for tls_addr in tls_addrs:
decompiled_func, _ = decompileContainingFunction(tls_addr)
high_func = decompiled_func.getHighFunction()
commitLocalNamesToDatabase(high_func, SourceType.USER_DEFINED)
# Refresh high func after committing local names
decompiled_func, func = decompileContainingFunction(tls_addr)
high_func = decompiled_func.getHighFunction()
tls_symbol = None
lsm = high_func.getLocalSymbolMap()
high_symbols = lsm.getSymbols()
for high_symbol in high_symbols:
if high_symbol.getPCAddress() == tls_addr:
tls_symbol = high_symbol.getSymbol()
break
if tls_symbol is not None:
variables = func.getLocalVariables()
for variable in variables:
if tls_symbol == variable.getSymbol():
variable.setDataType(tls_pointer_datatype, SourceType.USER_DEFINED)
forceSetVariableName(variable, 'tls')
print('Retyped {} and set name to {}'.format(tls_addr, variable.getName()))
break
# Handle getThreadLocalStorage when not inlined
get_tls_bytes = [0x70, 0x4f, 0x1d, 0xee, 0x1e, 0xff, 0x2f ,0xe1]
get_tls_mask = [0xff, 0x0f, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff]
tls_addrs = findByteArray(get_tls_bytes, get_tls_mask)
for tls_addr in tls_addrs:
func = getFunctionContaining(tls_addr)
nameCtrFunc(func, 'getThreadLocalStorage')

4193
ctr_services.py Normal file

File diff suppressed because it is too large Load Diff

127
ghidra_utils.py Normal file
View File

@ -0,0 +1,127 @@
# Gross, but seems to be the way to get the ghidra apis/values from an external module
from __main__ import currentProgram, monitor
from array import array
from ghidra.program.model.symbol import SourceType
from ghidra.program.flatapi import FlatProgramAPI
from ghidra.app.decompiler import DecompInterface
flatProgramAPI = FlatProgramAPI(currentProgram)
toAddr = flatProgramAPI.toAddr
getFunctionContaining = flatProgramAPI.getFunctionContaining
getNamespace = flatProgramAPI.getNamespace
memory = currentProgram.getMemory()
symbolTable = currentProgram.getSymbolTable()
namespaceManager = currentProgram.getNamespaceManager()
globalNamespace = namespaceManager.getGlobalNamespace()
decomp = DecompInterface()
decomp.openProgram(currentProgram)
def bytesToByteStr(bytes):
encoded_bytes = map(lambda byte : chr(byte), bytes)
return ''.join(encoded_bytes)
def bytesToByteArr(bytes):
return array('b', bytesToByteStr(bytes))
def findByteArray(bytes, mask = None):
result = []
addr = toAddr(0)
byte_arr = bytesToByteArr(bytes)
mask_arr = None
if mask is not None:
mask_arr = bytesToByteArr(mask)
while True:
addr = memory.findBytes(addr.add(len(bytes)), byte_arr, mask_arr, True, monitor)
if addr is not None:
result.append(addr)
else:
return result
def findFirstByteArray(bytes, mask = None):
byte_arr = bytesToByteArr(bytes)
mask_arr = None
if mask is not None:
mask_arr = bytesToByteArr(mask)
return memory.findBytes(toAddr(0), byte_arr, mask_arr, True, monitor)
def readBytes(addr, len):
byteArray = bytesToByteArr([0 for i in range(len)])
memory.getBytes(addr, byteArray)
return byteArray
def forceSetVariableName(variable, name):
while True:
try:
variable.setName(name, SourceType.USER_DEFINED)
break
except:
name = '_' + name
return name
def decompileContainingFunction(addr):
func = getFunctionContaining(addr)
return decomp.decompileFunction(func, 60, monitor), func
def getOrCreateNamespace(namespace_name, parent_namespace = globalNamespace):
namespace = getNamespace(parent_namespace, namespace_name)
if namespace is not None:
return namespace
return symbolTable.createNameSpace(parent_namespace, namespace_name, SourceType.USER_DEFINED)
def getOrCreateNestedNamespace(str, parent_namespace):
namespaces = str.split('::')
current_parent = parent_namespace
for namespace in namespaces:
if len(namespace) > 0:
current_parent = getOrCreateNamespace(namespace, current_parent)
return current_parent
def nameCtrFunc(func, name):
split_name = name.split('::')
base_name = split_name[-1]
namespace = '::'.join(split_name[:-1])
ctr_namespace = getOrCreateNamespace('ctr')
func_namespace = getOrCreateNestedNamespace(namespace, ctr_namespace)
func.setName(base_name, SourceType.USER_DEFINED)
func.setParentNamespace(func_namespace)
print('Named {} to {}'.format(func.getEntryPoint(), base_name))
def getCallArgs(caller_func, callee_addr):
decompiled_func = decomp.decompileFunction(caller_func, 60, monitor)
high_func = decompiled_func.getHighFunction()
if not high_func:
return []
opiter = high_func.getPcodeOps()
while opiter.hasNext():
op = opiter.next()
mnemonic = str(op.getMnemonic())
if mnemonic == 'CALL':
inputs = op.getInputs()
addr = inputs[0].getAddress()
if addr == callee_addr:
return inputs[1:]
return []
def getCallArgsFromRef(call_ref):
from_addr = call_ref.getFromAddress()
to_addr = call_ref.getToAddress()
func = getFunctionContaining(from_addr)
return getCallArgs(func, to_addr)
def getValueFromConstantVarnode(varnode):
addr = varnode.getAddress()
if addr.isConstantAddress():
return addr.getOffset()
return None