teak-llvm/lldb/packages/Python/lldbsuite/test/tools/lldb-server/TestGdbRemoteThreadsInStopReply.py
Jason Molenda 2d5d71c061 Revert this patch; I was emailing with Eugene and they have some other changes going
in today and don't want the two changes to confuse the situation with the build bots.
I'll commit tomorrow once they're known good.

llvm-svn: 313934
2017-09-21 23:02:56 +00:00

299 lines
11 KiB
Python

from __future__ import print_function
import json
import re
import gdbremote_testcase
from lldbsuite.test.decorators import *
from lldbsuite.test.lldbtest import *
from lldbsuite.test import lldbutil
class TestGdbRemoteThreadsInStopReply(
gdbremote_testcase.GdbRemoteTestCaseBase):
mydir = TestBase.compute_mydir(__file__)
ENABLE_THREADS_IN_STOP_REPLY_ENTRIES = [
"read packet: $QListThreadsInStopReply#21",
"send packet: $OK#00",
]
def gather_stop_reply_fields(self, post_startup_log_lines, thread_count,
field_names):
# Set up the inferior args.
inferior_args = []
for i in range(thread_count - 1):
inferior_args.append("thread:new")
inferior_args.append("sleep:10")
procs = self.prep_debug_monitor_and_inferior(
inferior_args=inferior_args)
self.add_register_info_collection_packets()
self.add_process_info_collection_packets()
# Assumes test_sequence has anything added needed to setup the initial state.
# (Like optionally enabling QThreadsInStopReply.)
if post_startup_log_lines:
self.test_sequence.add_log_lines(post_startup_log_lines, True)
self.test_sequence.add_log_lines([
"read packet: $c#63"
], True)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
hw_info = self.parse_hw_info(context)
# Give threads time to start up, then break.
time.sleep(1)
self.reset_test_sequence()
self.test_sequence.add_log_lines(
[
"read packet: {}".format(
chr(3)),
{
"direction": "send",
"regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$",
"capture": {
1: "stop_result",
2: "key_vals_text"}},
],
True)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
# Wait until all threads have started.
threads = self.wait_for_thread_count(thread_count, timeout_seconds=3)
self.assertIsNotNone(threads)
self.assertEqual(len(threads), thread_count)
# Run, then stop the process, grab the stop reply content.
self.reset_test_sequence()
self.test_sequence.add_log_lines(["read packet: $c#63",
"read packet: {}".format(chr(3)),
{"direction": "send",
"regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$",
"capture": {1: "stop_result",
2: "key_vals_text"}},
],
True)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
# Parse the stop reply contents.
key_vals_text = context.get("key_vals_text")
self.assertIsNotNone(key_vals_text)
kv_dict = self.parse_key_val_dict(key_vals_text)
self.assertIsNotNone(kv_dict)
result = dict();
result["pc_register"] = hw_info["pc_register"]
result["little_endian"] = hw_info["little_endian"]
for key_field in field_names:
result[key_field] = kv_dict.get(key_field)
return result
def gather_stop_reply_threads(self, post_startup_log_lines, thread_count):
# Pull out threads from stop response.
stop_reply_threads_text = self.gather_stop_reply_fields(
post_startup_log_lines, thread_count, ["threads"])["threads"]
if stop_reply_threads_text:
return [int(thread_id, 16)
for thread_id in stop_reply_threads_text.split(",")]
else:
return []
def gather_stop_reply_pcs(self, post_startup_log_lines, thread_count):
results = self.gather_stop_reply_fields( post_startup_log_lines,
thread_count, ["threads", "thread-pcs"])
if not results:
return []
threads_text = results["threads"]
pcs_text = results["thread-pcs"]
thread_ids = threads_text.split(",")
pcs = pcs_text.split(",")
self.assertTrue(len(thread_ids) == len(pcs))
thread_pcs = dict()
for i in range(0, len(pcs)):
thread_pcs[int(thread_ids[i], 16)] = pcs[i]
result = dict()
result["thread_pcs"] = thread_pcs
result["pc_register"] = results["pc_register"]
result["little_endian"] = results["little_endian"]
return result
def switch_endian(self, egg):
return "".join(reversed(re.findall("..", egg)))
def parse_hw_info(self, context):
self.assertIsNotNone(context)
process_info = self.parse_process_info_response(context)
endian = process_info.get("endian")
reg_info = self.parse_register_info_packets(context)
(pc_lldb_reg_index, pc_reg_info) = self.find_pc_reg_info(reg_info)
hw_info = dict()
hw_info["pc_register"] = pc_lldb_reg_index
hw_info["little_endian"] = (endian == "little")
return hw_info
def gather_threads_info_pcs(self, pc_register, little_endian):
self.reset_test_sequence()
self.test_sequence.add_log_lines(
[
"read packet: $jThreadsInfo#c1",
{
"direction": "send",
"regex": r"^\$(.*)#[0-9a-fA-F]{2}$",
"capture": {
1: "threads_info"}},
],
True)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
threads_info = context.get("threads_info")
register = str(pc_register)
# The jThreadsInfo response is not valid JSON data, so we have to
# clean it up first.
jthreads_info = json.loads(re.sub(r"}]", "}", threads_info))
thread_pcs = dict()
for thread_info in jthreads_info:
tid = thread_info["tid"]
pc = thread_info["registers"][register]
thread_pcs[tid] = self.switch_endian(pc) if little_endian else pc
return thread_pcs
def QListThreadsInStopReply_supported(self):
procs = self.prep_debug_monitor_and_inferior()
self.test_sequence.add_log_lines(
self.ENABLE_THREADS_IN_STOP_REPLY_ENTRIES, True)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
@debugserver_test
def test_QListThreadsInStopReply_supported_debugserver(self):
self.init_debugserver_test()
self.build()
self.set_inferior_startup_launch()
self.QListThreadsInStopReply_supported()
@llgs_test
def test_QListThreadsInStopReply_supported_llgs(self):
self.init_llgs_test()
self.build()
self.set_inferior_startup_launch()
self.QListThreadsInStopReply_supported()
def stop_reply_reports_multiple_threads(self, thread_count):
# Gather threads from stop notification when QThreadsInStopReply is
# enabled.
stop_reply_threads = self.gather_stop_reply_threads(
self.ENABLE_THREADS_IN_STOP_REPLY_ENTRIES, thread_count)
self.assertEqual(len(stop_reply_threads), thread_count)
@debugserver_test
def test_stop_reply_reports_multiple_threads_debugserver(self):
self.init_debugserver_test()
self.build()
self.set_inferior_startup_launch()
self.stop_reply_reports_multiple_threads(5)
@llgs_test
def test_stop_reply_reports_multiple_threads_llgs(self):
self.init_llgs_test()
self.build()
self.set_inferior_startup_launch()
self.stop_reply_reports_multiple_threads(5)
def no_QListThreadsInStopReply_supplies_no_threads(self, thread_count):
# Gather threads from stop notification when QThreadsInStopReply is not
# enabled.
stop_reply_threads = self.gather_stop_reply_threads(None, thread_count)
self.assertEqual(len(stop_reply_threads), 0)
@debugserver_test
def test_no_QListThreadsInStopReply_supplies_no_threads_debugserver(self):
self.init_debugserver_test()
self.build()
self.set_inferior_startup_launch()
self.no_QListThreadsInStopReply_supplies_no_threads(5)
@llgs_test
def test_no_QListThreadsInStopReply_supplies_no_threads_llgs(self):
self.init_llgs_test()
self.build()
self.set_inferior_startup_launch()
self.no_QListThreadsInStopReply_supplies_no_threads(5)
def stop_reply_reports_correct_threads(self, thread_count):
# Gather threads from stop notification when QThreadsInStopReply is
# enabled.
stop_reply_threads = self.gather_stop_reply_threads(
self.ENABLE_THREADS_IN_STOP_REPLY_ENTRIES, thread_count)
self.assertEqual(len(stop_reply_threads), thread_count)
# Gather threads from q{f,s}ThreadInfo.
self.reset_test_sequence()
self.add_threadinfo_collection_packets()
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
threads = self.parse_threadinfo_packets(context)
self.assertIsNotNone(threads)
self.assertEqual(len(threads), thread_count)
# Ensure each thread in q{f,s}ThreadInfo appears in stop reply threads
for tid in threads:
self.assertTrue(tid in stop_reply_threads)
@debugserver_test
def test_stop_reply_reports_correct_threads_debugserver(self):
self.init_debugserver_test()
self.build()
self.set_inferior_startup_launch()
self.stop_reply_reports_correct_threads(5)
@llgs_test
def test_stop_reply_reports_correct_threads_llgs(self):
self.init_llgs_test()
self.build()
self.set_inferior_startup_launch()
self.stop_reply_reports_correct_threads(5)
def stop_reply_contains_thread_pcs(self, thread_count):
results = self.gather_stop_reply_pcs(
self.ENABLE_THREADS_IN_STOP_REPLY_ENTRIES, thread_count)
stop_reply_pcs = results["thread_pcs"]
pc_register = results["pc_register"]
little_endian = results["little_endian"]
self.assertEqual(len(stop_reply_pcs), thread_count)
threads_info_pcs = self.gather_threads_info_pcs(pc_register,
little_endian)
self.assertEqual(len(threads_info_pcs), thread_count)
for thread_id in stop_reply_pcs:
self.assertTrue(thread_id in threads_info_pcs)
self.assertTrue(int(stop_reply_pcs[thread_id], 16)
== int(threads_info_pcs[thread_id], 16))
@llgs_test
def test_stop_reply_contains_thread_pcs_llgs(self):
self.init_llgs_test()
self.build()
self.set_inferior_startup_launch()
self.stop_reply_contains_thread_pcs(5)
@debugserver_test
def test_stop_reply_contains_thread_pcs_debugserver(self):
self.init_debugserver_test()
self.build()
self.set_inferior_startup_launch()
self.stop_reply_contains_thread_pcs(5)