diff --git a/tests/Makefile b/tests/Makefile index 0659600..bd95ab3 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -3,11 +3,15 @@ PROGRAM=tests EXTRA_LINKER_SCRIPTS = $(PROGRAM_DIR)ld/tests.ld PROGRAM_SRC_DIR = $(PROGRAM_DIR) $(PROGRAM_DIR)cases +# Add unity test framework headers & core source file +PROGRAM_INC_DIR = $(PROGRAM_DIR)unity/src +PROGRAM_EXTRA_SRC_FILES = $(PROGRAM_DIR)unity/src/unity.c + # append -u +#include +#include + +/* Basic test cases to validate the FreeRTOS scheduler works */ + +DEFINE_SOLO_TESTCASE(01_scheduler_basic) +DEFINE_SOLO_TESTCASE(01_scheduler_priorities) + +void set_variable(void *pvParameters) +{ + bool *as_bool = (bool *)pvParameters; + *as_bool = true; + /* deliberately a busywait at the end, not vTaskSuspend, to test priorities */ + while(1) { } +} + +/* Really simple - do created tasks run? */ +static void a_01_scheduler_basic() +{ + volatile bool a = false, b = false, c = false; + printf("top of scheduler...\n"); + uart_flush_txfifo(0); + + xTaskCreate(set_variable, (signed char *)"set_a", 128, (void *)&a, tskIDLE_PRIORITY, NULL); + xTaskCreate(set_variable, (signed char *)"set_b", 128, (void *)&b, tskIDLE_PRIORITY, NULL); + xTaskCreate(set_variable, (signed char *)"set_c", 128, (void *)&c, tskIDLE_PRIORITY, NULL); + + TEST_ASSERT_FALSE_MESSAGE(a, "task set_a shouldn't run yet"); + TEST_ASSERT_FALSE_MESSAGE(b, "task set_b shouldn't run yet"); + TEST_ASSERT_FALSE_MESSAGE(c, "task set_c shouldn't run yet"); + + vTaskDelay(5); + + TEST_ASSERT_TRUE_MESSAGE(a, "task set_a should have run"); + TEST_ASSERT_TRUE_MESSAGE(b, "task set_b should have run"); + TEST_ASSERT_TRUE_MESSAGE(c, "task set_c should have run"); + TEST_PASS; +} + +/* Verify that a high-priority task will starve a lower priority task */ +static void a_01_scheduler_priorities() +{ + /* Increase priority of the init task to make sure it always takes priority */ + vTaskPrioritySet(xTaskGetCurrentTaskHandle(), tskIDLE_PRIORITY+4); + + bool lower = false, higher = false; + xTaskHandle task_lower, task_higher; + + xTaskCreate(set_variable, (signed char *)"high_prio", 128, (void *)&higher, tskIDLE_PRIORITY+1, &task_higher); + xTaskCreate(set_variable, (signed char *)"low_prio", 128, (void *)&lower, tskIDLE_PRIORITY, &task_lower); + + TEST_ASSERT_FALSE_MESSAGE(higher, "higher prio task should not have run yet"); + TEST_ASSERT_FALSE_MESSAGE(lower, "lower prio task should not have run yet"); + + vTaskDelay(2); + + TEST_ASSERT_TRUE_MESSAGE(higher, "higher prio task should have run"); + TEST_ASSERT_FALSE_MESSAGE(lower, "lower prio task should not have run"); + + /* Bump lower priority task over higher priority task */ + vTaskPrioritySet(task_lower, tskIDLE_PRIORITY+2); + + TEST_ASSERT_FALSE_MESSAGE(lower, "lower prio task should still not have run yet"); + + vTaskDelay(1); + + TEST_ASSERT_TRUE_MESSAGE(lower, "lower prio task should have run"); + TEST_PASS; +} diff --git a/tests/cases/02_heap.c b/tests/cases/02_heap.c new file mode 100644 index 0000000..a779e3a --- /dev/null +++ b/tests/cases/02_heap.c @@ -0,0 +1,37 @@ +#include "testcase.h" +#include +#include +#include + +DEFINE_SOLO_TESTCASE(02_heap_simple) + +/* Simple heap accounting tests */ +static void a_02_heap_simple() +{ + struct mallinfo info = mallinfo(); + printf("'arena' allocation size %d bytes\n", info.arena); + /* This is really a sanity check, if the "arena" size shrinks then + this is a good thing and we can update the test. If it grows + then we can also update the test, but we need a good reason. */ + TEST_ASSERT_INT_WITHIN_MESSAGE(1000, 15000, info.arena, "Initial allocated heap should be approximately 15kB. SEE COMMENT."); + + uint32_t freeheap = xPortGetFreeHeapSize(); + printf("xPortGetFreeHeapSize = %d bytes\n", freeheap); + TEST_ASSERT_TRUE_MESSAGE(freeheap > 20000, "Should be at least 20kB free."); + + uint8_t *buf = malloc(8192); + /* <-- have to do something with buf or gcc helpfully optimises it out! */ + memset(buf, 0xEE, 8192); + uint32_t after = xPortGetFreeHeapSize(); + struct mallinfo after_info = mallinfo(); + printf("after arena size = %d bytes\n", after_info.arena); + printf("after xPortGetFreeHeapSize = %d bytes\n", after); + TEST_ASSERT_UINT32_WITHIN_MESSAGE(100, info.arena+8192, after_info.arena, "Allocated heap 'after' size should be 8kB more than before"); + TEST_ASSERT_UINT32_WITHIN_MESSAGE(100, freeheap-8192, after, "Free heap size should be 8kB less than before"); + + free(buf); + after = xPortGetFreeHeapSize(); + printf("after freeing xPortGetFreeHeapSize = %d bytes\n", after); + TEST_ASSERT_UINT32_WITHIN_MESSAGE(100, freeheap, after, "Free heap size after freeing buffer should be close to initial"); + TEST_PASS; +} diff --git a/tests/include/testcase.h b/tests/include/testcase.h index 1d3e696..01c3198 100644 --- a/tests/include/testcase.h +++ b/tests/include/testcase.h @@ -4,6 +4,12 @@ #include #include "esp/uart.h" +/* Unity is the framework with test assertions, etc. */ +#include "unity.h" + +/* Need to explicitly flag once a test has completed successfully. */ +#define TEST_PASS do { UnityConcludeTest(); while(1) { } } while (0) + /* Types of test, defined by hardware requirements */ typedef enum { SOLO, /* Test require "ESP A" only, no other connections */ @@ -11,30 +17,43 @@ typedef enum { EYORE_TEST, /* Test requires an eyore-test board with onboard STM32F0 */ } testcase_type_t; -typedef bool (testcase_fn_t)(void); +typedef void (testcase_fn_t)(void); typedef struct { char *name; + char *file; + uint8_t line; testcase_type_t type; testcase_fn_t *a_fn; testcase_fn_t *b_fn; } testcase_t; +void testcase_register(const testcase_t *ignored); /* Register a test case using these macros. Use DEFINE_SOLO_TESTCASE for single-MCU tests, and DEFINE_TESTCASE for all other test types. - */ -#define DEFINE_SOLO_TESTCASE(NAME) \ - static testcase_fn_t a_##NAME; \ - const __attribute__((section(".testcases.text"))) __attribute__((used)) \ - testcase_t testcase_##NAME = { .name = #NAME, .type = SOLO, .a_fn = a_##NAME }; +*/ +#define DEFINE_SOLO_TESTCASE(NAME) \ + static testcase_fn_t a_##NAME; \ + _DEFINE_TESTCASE_COMMON(NAME, SOLO, a_##NAME, 0) + +#define DEFINE_TESTCASE(NAME, TYPE) \ + static testcase_fn_t a_##NAME; \ + static testcase_fn_t b_##NAME; \ + _DEFINE_TESTCASE_COMMON(NAME, TYPE, A_##NAME, B_##NAME) -#define DEFINE_TESTCASE(NAME, TYPE) \ - static testcase_fn_t a_##NAME; \ - static testcase_fn_t b_##NAME; \ - const __attribute__((section(".testcases.text"))) __attribute__((used)) \ - testcase_t testcase_##NAME = { .name = #NAME, .type = TYPE, .a_fn = a_##NAME, .b_fn = b_##NAME }; - +#define _DEFINE_TESTCASE_COMMON(NAME, TYPE, A_FN, B_FN) \ + const __attribute__((section(".testcases.text"))) \ + testcase_t testcase_##NAME = { .name = #NAME, \ + .file = __FILE__, \ + .line = __LINE__, \ + .type = TYPE, \ + .a_fn = A_FN, \ + .b_fn = B_FN, \ + }; \ + void __attribute__((constructor)) testcase_ctor_##NAME() { \ + testcase_register(&testcase_##NAME); \ + } #endif diff --git a/tests/test_main.c b/tests/test_main.c index 2f75390..48f2684 100644 --- a/tests/test_main.c +++ b/tests/test_main.c @@ -78,9 +78,25 @@ void user_init(void) type = 'A'; else if (type == 'b') type = 'B'; - printf("Running test case %d as %c (%s %s)\n", case_idx, type, cases_start[case_idx].name, get_requirements_name(cases_start[case_idx].type)); + const testcase_t *tcase = &cases_start[case_idx]; + printf("\nRunning test case %d (%s %s) as instance %c \nDefinition at %s:%d\n***\n", case_idx, + tcase->name, get_requirements_name(tcase->type), type, + tcase->file, tcase->line); + Unity.CurrentTestName = tcase->name; + Unity.TestFile = tcase->file; + Unity.CurrentTestLineNumber = tcase->line; + Unity.NumberOfTests = 1; if(type=='A') cases_start[case_idx].a_fn(); else cases_start[case_idx].b_fn(); + TEST_FAIL_MESSAGE("\n\nTest initialisation routine returned without calling TEST_PASS. Buggy test?"); +} + +/* testcase_register is a no-op function, we just need it so the linker + knows to pull in the argument at link time. + */ +void testcase_register(const testcase_t __attribute__((unused)) *ignored) +{ + } diff --git a/tests/test_runner.py b/tests/test_runner.py new file mode 100755 index 0000000..22e4c46 --- /dev/null +++ b/tests/test_runner.py @@ -0,0 +1,331 @@ +#!/usr/bin/env python3 +import sys +import argparse +import subprocess +import os +import serial +import threading +import re +import time +import traceback + +TEST_RESET_TIMEOUT=0.1 +TESTCASE_TIMEOUT=10 +TESTRUNNER_BANNER="esp-open-rtos test runner." + +def main(): + global verbose + args = parse_args() + verbose = args.verbose + + if not args.no_flash: + flash_image(args.aport) + if args.type != 'solo': + flash_image(args.bport) + + env = TestEnvironment(args.aport, TestEnvironment.A) + cases = env.get_testlist() + if args.type != 'solo': + env_b = TestEnvironment(args.bport, TestEnvironment.B) + cases_b = env_b.get_testlist() + if cases != cases_b: + raise TestRunnerError("Test cases on units A & B don't match") + + counts = dict((status,0) for status in TestResult.STATUS_NAMES.keys()) + failures = False + for test in cases: + res = test.run(env) + counts[res.status] += 1 + failures = failures or res.is_failure() + + print("%20s: %d" % ("Total tests", sum(c for c in counts.values()))) + print() + # print status counts for tests + for c in sorted(counts.keys()): + print("%20s: %d" % (TestResult.STATUS_NAMES[c], counts[c])) + + sys.exit(1 if failures else 0) + +class TestCase(object): + def __init__(self, index, name, case_type): + self.name = name + self.index = index + self.case_type = case_type + + def __repr__(self): + return "#%d: %s (%s)" % (self.index, self.name, self.case_type) + + def __eq__(self, other): + return (self.index == other.index + and self.name == other.name + and self.case_type == other.case_type) + + def run(self, env_a, env_b = None): + """ + Run the test represented by this instance, against the environment(s) passed in. + + Returns a TestResult + """ + print("Running test case '%s'..." % self.name) + mon_a = env_a.start_testcase(self) + mon_b = env_b.start_testcase(self) if env_b else None + while True: + if mon_a.get_result() and (mon_b is None or mon_b.get_result()): + break # all running test environments have finished + + # or, in the case both are running, stop as soon as either environemnt shows a failure + try: + if mon_a.get_result().is_failure(): + mon_b.cancel() + break + except AttributeError: + pass + try: + if mon_b.get_result().is_failure(): + mon_a.cancel() + break + except AttributeError: + pass + time.sleep(0.1) + + if mon_b is not None: + # return whichever result is more severe + return max(mon_a.get_result(), mon_b.get_result()) + else: + return mon_a.get_result() + +class TestResult(object): + """ Class to wrap a test result code and a message """ + # Test status flags, higher = more severe + CANCELLED = 0 + SKIPPED = 1 + PASSED = 2 + FAILED = 3 + ERROR = 4 + + STATUS_NAMES = { + CANCELLED : "Cancelled", + SKIPPED : "Skipped", + PASSED : "Passed", + FAILED : "Failed", + ERROR : "Error" + } + + def __init__(self, status, message): + self.status = status + self.message = message + + def is_failure(self): + return self.status >= TestResult.FAILED + + def __cmp__(self, other): + if other is None: + return 1 + return self.status - other.status + +class TestMonitor(object): + """ Class to monitor a running test case in a separate thread, defer reporting of the result until it's done. + + Can poll for completion by calling is_done(), read a TestResult via .get_result() + """ + def __init__(self, port, instance): + super(TestMonitor, self).__init__() + self._thread = threading.Thread(target=self._monitorThread) + self._port = port + self._instance = instance + self._result = None + self._cancelled = False + self.output = "" + self._thread.start() + + def cancel(self): + self._cancelled = True + + def is_done(self): + return self._result is not None + + def get_result(self): + return self._result + + def _monitorThread(self): + self.output = "" + start_time = time.time() + self._port.timeout = 0.1 + try: + while not self._cancelled and time.time() < start_time + TESTCASE_TIMEOUT: + line = self._port.readline().decode("utf-8", "ignore") + if line == "": + line = "(TIMED OUT)\r\n" + self.output += "%s+%4.2fs %s" % (self._instance, time.time()-start_time, line) + verbose_print(line.strip()) + if line.endswith(":PASS\r\n"): + self._result = TestResult(TestResult.PASSED, "Test passed.") + return + elif ":FAIL:" in line: + self._result = TestResult(TestResult.FAILED, "Test failed.") + return + elif line == TESTRUNNER_BANNER: + self._result = TestResult(TestResult.ERROR, "Test caused crash and reset.") + return + if not self._cancelled: + self._result = TestResult(TestResult.CANCELLED, "Cancelled") + else: + self._result = TestResult(TestResult.ERROR, "Test timed out") + + finally: + self._port.timeout = None + +class TestEnvironment(object): + A = "A" + B = "B" + + def __init__(self, port, instance): + self._name = port + self._port = TestSerialPort(port, baudrate=115200) + self._instance = instance + + def reset(self): + """ Resets the test board, and waits for the test runner program to start up """ + self._port.setDTR(False) + self._port.setRTS(True) + time.sleep(0.05) + self._port.flushInput() + self._port.setRTS(False) + verbose_print("Waiting for test runner startup...") + if not self._port.wait_line(lambda line: line == TESTRUNNER_BANNER): + raise TestRunnerError("Port %s failed to start test runner" % self._port) + + def get_testlist(self): + """ Resets the test board and returns the enumerated list of all supported tests """ + self.reset() + tests = [] + verbose_print("Enumerating tests...") + + def collect_testcases(line): + if line.startswith(">"): + return True # prompt means list of test cases is done, success + m = re.match(r"CASE (\d+) = (.+?) ([A-Z]+)", line) + if m is not None: + t = TestCase(int(m.group(1)), m.group(2), m.group(3).lower()) + verbose_print(t) + tests.append(t) + if not self._port.wait_line(collect_testcases): + raise TestRunnerError("Port %s failed to read test list" % self._port) + verbose_print("Port %s found %d test cases" % (self._name, len(tests))) + return tests + + def start_testcase(self, case): + """ Starts the specified test instance and returns an TestMonitor reader thread instance to monitor the output """ + # synchronously start the test case + self.reset() + if not self._port.wait_line(lambda line: line.startswith(">")): + raise TestRunnerError("Failed to read test runnner prompt") + command = "%s%d\r\n" % (self._instance, case.index) + self._port.write(command.encode("utf-8")) + return TestMonitor(self._port, self._instance) + + +def get_testdir(): + """ + Return the 'tests' directory in the source tree + (assuming the test_runner.py script is in that directory. + """ + res = os.path.dirname(__name__) + return "." if res == "" else res + + +def flash_image(serial_port): + # Bit hacky: rather than calling esptool directly, just use the Makefile flash target + # with the correct ESPPORT argument + env = dict(os.environ) + env["ESPPORT"] = serial_port + verbose_print("Building and flashing test image to %s..." % serial_port) + try: + stdout = sys.stdout if verbose else None + output = subprocess.run(["make","flash"], check=True, cwd=get_testdir(), stdout=stdout, stderr=subprocess.STDOUT, env=env) + except subprocess.CalledProcessError as e: + raise TestRunnerError("'make flash EPPORT=%s' failed with exit code %d" % (serial_port, e.returncode)) + verbose_print("Flashing successful.") + + +def parse_args(): + parser = argparse.ArgumentParser(description='esp-open-rtos testrunner', prog='test_runner') + + parser.add_argument( + '--type', '-t', + help='Type of test hardware attached to serial ports A & (optionally) B', + choices=['solo','dual','eyore_test'], default='solo') + + parser.add_argument( + '--aport', '-a', + help='Serial port for device A', + default='/dev/ttyUSB0') + + parser.add_argument( + '--bport', '-b', + help='Serial port for device B (ignored if type is \'solo\')', + default='/dev/ttyUSB1') + + parser.add_argument( + '--no-flash', '-n', + help='Don\'t flash the test binary image before running tests', + action='store_true', + default=False) + + parser.add_argument( + '--verbose', '-v', + help='Verbose test runner debugging output', + action='store_true', + default=False) + + parser.add_argument('testcases', nargs='*', + help='Optional list of test cases to run. By default, all tests are run.') + + return parser.parse_args() + + +class TestRunnerError(RuntimeError): + def __init__(self, message): + RuntimeError.__init__(self, message) + +class TestSerialPort(serial.Serial): + def __init__(self, *args, **kwargs): + super(TestSerialPort, self).__init__(*args, **kwargs) + + def wait_line(self, callback, timeout = TEST_RESET_TIMEOUT): + """ Wait for the port to output a particular piece of line content, as judged by callback + + Callback called as 'callback(line)' and returns not-True if non-match otherwise can return any value. + + Returns first non-False result from the callback, or None if it timed out waiting for a new line. + + Note that a serial port spewing legitimate lines of output may block this function forever, if callback + doesn't detect this is happening. + """ + self.timeout = timeout + try: + res = None + while not res: + line = self.readline() + if line == b"": + break # timed out + line = line.decode("utf-8", "ignore").rstrip() + res = callback(line) + return res + finally: + self.timeout = None + +verbose = False + + +def verbose_print(msg): + if verbose: + print(msg) + +if __name__ == '__main__': + try: + main() + except TestRunnerError as e: + print(e) + sys.exit(2) +