From 80b191af0854530a69b88875671c61419a3b6f5f Mon Sep 17 00:00:00 2001
From: Angus Gratton <gus@projectgus.com>
Date: Tue, 9 Feb 2016 21:03:50 +1100
Subject: [PATCH] tests: test_runner, working simple single-ESP "solo" test
 cases

---
 tests/Makefile             |   6 +-
 tests/cases/01_basic.c     |   8 -
 tests/cases/01_scheduler.c |  71 ++++++++
 tests/cases/02_heap.c      |  37 +++++
 tests/include/testcase.h   |  43 +++--
 tests/test_main.c          |  18 +-
 tests/test_runner.py       | 331 +++++++++++++++++++++++++++++++++++++
 7 files changed, 492 insertions(+), 22 deletions(-)
 delete mode 100644 tests/cases/01_basic.c
 create mode 100644 tests/cases/01_scheduler.c
 create mode 100644 tests/cases/02_heap.c
 create mode 100755 tests/test_runner.py

diff --git a/tests/Makefile b/tests/Makefile
index 0659600..bd95ab3 100644
--- a/tests/Makefile
+++ b/tests/Makefile
@@ -3,11 +3,15 @@ PROGRAM=tests
 EXTRA_LINKER_SCRIPTS = $(PROGRAM_DIR)ld/tests.ld
 PROGRAM_SRC_DIR = $(PROGRAM_DIR) $(PROGRAM_DIR)cases
 
+# Add unity test framework headers & core source file
+PROGRAM_INC_DIR = $(PROGRAM_DIR)unity/src
+PROGRAM_EXTRA_SRC_FILES = $(PROGRAM_DIR)unity/src/unity.c
+
 # append -u <basename_test_entry to the linker arguments for
 # each source file in the 'cases' directory, so the test case
 # entries get added to the compiled binary
 TESTCASE_SRC_FILES = $(wildcard $(PROGRAM_DIR)cases/*.c)
 TESTCASE_ENTRIES = $(sort $(patsubst %.c,%,$(TESTCASE_SRC_FILES)))
-EXTRA_LDFLAGS = $(foreach entry,$(TESTCASE_ENTRIES),-u testcase_$(notdir $(entry)))
+EXTRA_LDFLAGS = $(foreach entry,$(TESTCASE_ENTRIES),$(BUILD_DIR)program/$(entry).o)
 
 include ../common.mk
diff --git a/tests/cases/01_basic.c b/tests/cases/01_basic.c
deleted file mode 100644
index 0122dcd..0000000
--- a/tests/cases/01_basic.c
+++ /dev/null
@@ -1,8 +0,0 @@
-#include "testcase.h"
-
-DEFINE_SOLO_TESTCASE(01_basic)
-
-static bool a_01_basic()
-{
-    return false;
-}
diff --git a/tests/cases/01_scheduler.c b/tests/cases/01_scheduler.c
new file mode 100644
index 0000000..2a2a9e6
--- /dev/null
+++ b/tests/cases/01_scheduler.c
@@ -0,0 +1,71 @@
+#include "testcase.h"
+#include <FreeRTOS.h>
+#include <task.h>
+#include <esp/uart.h>
+
+/* Basic test cases to validate the FreeRTOS scheduler works */
+
+DEFINE_SOLO_TESTCASE(01_scheduler_basic)
+DEFINE_SOLO_TESTCASE(01_scheduler_priorities)
+
+void set_variable(void *pvParameters)
+{
+    bool *as_bool = (bool *)pvParameters;
+    *as_bool = true;
+    /* deliberately a busywait at the end, not vTaskSuspend, to test priorities */
+    while(1) { }
+}
+
+/* Really simple - do created tasks run? */
+static void a_01_scheduler_basic()
+{
+    volatile bool a = false, b = false, c = false;
+    printf("top of scheduler...\n");
+    uart_flush_txfifo(0);
+
+    xTaskCreate(set_variable, (signed char *)"set_a", 128, (void *)&a, tskIDLE_PRIORITY, NULL);
+    xTaskCreate(set_variable, (signed char *)"set_b", 128, (void *)&b, tskIDLE_PRIORITY, NULL);
+    xTaskCreate(set_variable, (signed char *)"set_c", 128, (void *)&c, tskIDLE_PRIORITY, NULL);
+
+    TEST_ASSERT_FALSE_MESSAGE(a, "task set_a shouldn't run yet");
+    TEST_ASSERT_FALSE_MESSAGE(b, "task set_b shouldn't run yet");
+    TEST_ASSERT_FALSE_MESSAGE(c, "task set_c shouldn't run yet");
+
+    vTaskDelay(5);
+
+    TEST_ASSERT_TRUE_MESSAGE(a, "task set_a should have run");
+    TEST_ASSERT_TRUE_MESSAGE(b, "task set_b should have run");
+    TEST_ASSERT_TRUE_MESSAGE(c, "task set_c should have run");
+    TEST_PASS;
+}
+
+/* Verify that a high-priority task will starve a lower priority task */
+static void a_01_scheduler_priorities()
+{
+    /* Increase priority of the init task to make sure it always takes priority */
+    vTaskPrioritySet(xTaskGetCurrentTaskHandle(), tskIDLE_PRIORITY+4);
+
+    bool lower = false, higher = false;
+    xTaskHandle task_lower, task_higher;
+
+    xTaskCreate(set_variable, (signed char *)"high_prio", 128, (void *)&higher, tskIDLE_PRIORITY+1, &task_higher);
+    xTaskCreate(set_variable, (signed char *)"low_prio", 128, (void *)&lower, tskIDLE_PRIORITY, &task_lower);
+
+    TEST_ASSERT_FALSE_MESSAGE(higher, "higher prio task should not have run yet");
+    TEST_ASSERT_FALSE_MESSAGE(lower, "lower prio task should not have run yet");
+
+    vTaskDelay(2);
+
+    TEST_ASSERT_TRUE_MESSAGE(higher, "higher prio task should have run");
+    TEST_ASSERT_FALSE_MESSAGE(lower, "lower prio task should not have run");
+
+    /* Bump lower priority task over higher priority task */
+    vTaskPrioritySet(task_lower, tskIDLE_PRIORITY+2);
+
+    TEST_ASSERT_FALSE_MESSAGE(lower, "lower prio task should still not have run yet");
+
+    vTaskDelay(1);
+
+    TEST_ASSERT_TRUE_MESSAGE(lower, "lower prio task should have run");
+    TEST_PASS;
+}
diff --git a/tests/cases/02_heap.c b/tests/cases/02_heap.c
new file mode 100644
index 0000000..a779e3a
--- /dev/null
+++ b/tests/cases/02_heap.c
@@ -0,0 +1,37 @@
+#include "testcase.h"
+#include <malloc.h>
+#include <string.h>
+#include <FreeRTOS.h>
+
+DEFINE_SOLO_TESTCASE(02_heap_simple)
+
+/* Simple heap accounting tests */
+static void a_02_heap_simple()
+{
+    struct mallinfo info = mallinfo();
+    printf("'arena' allocation size %d bytes\n", info.arena);
+    /* This is really a sanity check, if the "arena" size shrinks then
+       this is a good thing and we can update the test. If it grows
+       then we can also update the test, but we need a good reason. */
+    TEST_ASSERT_INT_WITHIN_MESSAGE(1000, 15000, info.arena, "Initial allocated heap should be approximately 15kB. SEE COMMENT.");
+
+    uint32_t freeheap = xPortGetFreeHeapSize();
+    printf("xPortGetFreeHeapSize = %d bytes\n", freeheap);
+    TEST_ASSERT_TRUE_MESSAGE(freeheap > 20000, "Should be at least 20kB free.");
+
+    uint8_t *buf = malloc(8192);
+    /* <-- have to do something with buf or gcc helpfully optimises it out! */
+    memset(buf, 0xEE, 8192);
+    uint32_t after = xPortGetFreeHeapSize();
+    struct mallinfo after_info = mallinfo();
+    printf("after arena size = %d bytes\n", after_info.arena);
+    printf("after xPortGetFreeHeapSize = %d bytes\n", after);
+    TEST_ASSERT_UINT32_WITHIN_MESSAGE(100, info.arena+8192, after_info.arena, "Allocated heap 'after' size should be 8kB more than before");
+    TEST_ASSERT_UINT32_WITHIN_MESSAGE(100, freeheap-8192, after, "Free heap size should be 8kB less than before");
+
+    free(buf);
+    after = xPortGetFreeHeapSize();
+    printf("after freeing xPortGetFreeHeapSize = %d bytes\n", after);
+    TEST_ASSERT_UINT32_WITHIN_MESSAGE(100, freeheap, after, "Free heap size after freeing buffer should be close to initial");
+    TEST_PASS;
+}
diff --git a/tests/include/testcase.h b/tests/include/testcase.h
index 1d3e696..01c3198 100644
--- a/tests/include/testcase.h
+++ b/tests/include/testcase.h
@@ -4,6 +4,12 @@
 #include <stdio.h>
 #include "esp/uart.h"
 
+/* Unity is the framework with test assertions, etc. */
+#include "unity.h"
+
+/* Need to explicitly flag once a test has completed successfully. */
+#define TEST_PASS do { UnityConcludeTest(); while(1) { } } while (0)
+
 /* Types of test, defined by hardware requirements */
 typedef enum {
     SOLO,        /* Test require "ESP A" only, no other connections */
@@ -11,30 +17,43 @@ typedef enum {
     EYORE_TEST,  /* Test requires an eyore-test board with onboard STM32F0 */
 } testcase_type_t;
 
-typedef bool (testcase_fn_t)(void);
+typedef void (testcase_fn_t)(void);
 
 typedef struct {
     char *name;
+    char *file;
+    uint8_t line;
     testcase_type_t type;
     testcase_fn_t *a_fn;
     testcase_fn_t *b_fn;
 } testcase_t;
 
+void testcase_register(const testcase_t *ignored);
 
 /* Register a test case using these macros. Use DEFINE_SOLO_TESTCASE for single-MCU tests,
    and DEFINE_TESTCASE for all other test types.
- */
-#define DEFINE_SOLO_TESTCASE(NAME)                                            \
-    static testcase_fn_t a_##NAME;                                          \
-    const __attribute__((section(".testcases.text"))) __attribute__((used)) \
-    testcase_t testcase_##NAME = { .name = #NAME, .type = SOLO, .a_fn = a_##NAME };
+*/
+#define DEFINE_SOLO_TESTCASE(NAME)                                      \
+    static testcase_fn_t a_##NAME;                                      \
+    _DEFINE_TESTCASE_COMMON(NAME, SOLO, a_##NAME, 0)
+
+#define DEFINE_TESTCASE(NAME, TYPE)                                     \
+    static testcase_fn_t a_##NAME;                                      \
+    static testcase_fn_t b_##NAME;                                      \
+    _DEFINE_TESTCASE_COMMON(NAME, TYPE, A_##NAME, B_##NAME)
 
 
-#define DEFINE_TESTCASE(NAME, TYPE)                                            \
-    static testcase_fn_t a_##NAME;                                          \
-    static testcase_fn_t b_##NAME;                                          \
-    const __attribute__((section(".testcases.text"))) __attribute__((used)) \
-    testcase_t testcase_##NAME = { .name = #NAME, .type = TYPE, .a_fn = a_##NAME, .b_fn = b_##NAME };
-
+#define _DEFINE_TESTCASE_COMMON(NAME, TYPE, A_FN, B_FN)    \
+    const __attribute__((section(".testcases.text")))                   \
+    testcase_t testcase_##NAME = { .name = #NAME,                       \
+                                   .file = __FILE__,                    \
+                                   .line = __LINE__,                    \
+                                   .type = TYPE,                        \
+                                   .a_fn = A_FN,                        \
+                                   .b_fn = B_FN,                        \
+    };                                                                  \
+    void __attribute__((constructor)) testcase_ctor_##NAME() {          \
+        testcase_register(&testcase_##NAME);                            \
+    }
 
 #endif
diff --git a/tests/test_main.c b/tests/test_main.c
index 2f75390..48f2684 100644
--- a/tests/test_main.c
+++ b/tests/test_main.c
@@ -78,9 +78,25 @@ void user_init(void)
         type = 'A';
     else if (type == 'b')
         type = 'B';
-    printf("Running test case %d as %c (%s %s)\n", case_idx, type, cases_start[case_idx].name, get_requirements_name(cases_start[case_idx].type));
+    const testcase_t *tcase = &cases_start[case_idx];
+    printf("\nRunning test case %d (%s %s) as instance %c \nDefinition at %s:%d\n***\n", case_idx,
+           tcase->name, get_requirements_name(tcase->type), type,
+           tcase->file, tcase->line);
+    Unity.CurrentTestName = tcase->name;
+    Unity.TestFile = tcase->file;
+    Unity.CurrentTestLineNumber = tcase->line;
+    Unity.NumberOfTests = 1;
     if(type=='A')
         cases_start[case_idx].a_fn();
     else
         cases_start[case_idx].b_fn();
+    TEST_FAIL_MESSAGE("\n\nTest initialisation routine returned without calling TEST_PASS. Buggy test?");
+}
+
+/* testcase_register is a no-op function, we just need it so the linker
+   knows to pull in the argument at link time.
+ */
+void testcase_register(const testcase_t __attribute__((unused)) *ignored)
+{
+
 }
diff --git a/tests/test_runner.py b/tests/test_runner.py
new file mode 100755
index 0000000..22e4c46
--- /dev/null
+++ b/tests/test_runner.py
@@ -0,0 +1,331 @@
+#!/usr/bin/env python3
+import sys
+import argparse
+import subprocess
+import os
+import serial
+import threading
+import re
+import time
+import traceback
+
+TEST_RESET_TIMEOUT=0.1
+TESTCASE_TIMEOUT=10
+TESTRUNNER_BANNER="esp-open-rtos test runner."
+
+def main():
+    global verbose
+    args = parse_args()
+    verbose = args.verbose
+
+    if not args.no_flash:
+        flash_image(args.aport)
+        if args.type != 'solo':
+            flash_image(args.bport)
+
+    env = TestEnvironment(args.aport, TestEnvironment.A)
+    cases = env.get_testlist()
+    if args.type != 'solo':
+        env_b = TestEnvironment(args.bport, TestEnvironment.B)
+        cases_b = env_b.get_testlist()
+        if cases != cases_b:
+            raise TestRunnerError("Test cases on units A & B don't match")
+
+    counts = dict((status,0) for status in TestResult.STATUS_NAMES.keys())
+    failures = False
+    for test in cases:
+        res = test.run(env)
+        counts[res.status] += 1
+        failures = failures or res.is_failure()
+
+    print("%20s: %d" % ("Total tests", sum(c for c in counts.values())))
+    print()
+    # print status counts for tests
+    for c in sorted(counts.keys()):
+        print("%20s: %d" % (TestResult.STATUS_NAMES[c], counts[c]))
+
+    sys.exit(1 if failures else 0)
+
+class TestCase(object):
+    def __init__(self, index, name, case_type):
+        self.name = name
+        self.index = index
+        self.case_type = case_type
+
+    def __repr__(self):
+        return "#%d: %s (%s)" % (self.index, self.name, self.case_type)
+
+    def __eq__(self, other):
+        return (self.index == other.index
+                and self.name == other.name
+                and self.case_type == other.case_type)
+
+    def run(self, env_a, env_b = None):
+        """
+        Run the test represented by this instance, against the environment(s) passed in.
+
+        Returns a TestResult
+        """
+        print("Running test case '%s'..." % self.name)
+        mon_a = env_a.start_testcase(self)
+        mon_b = env_b.start_testcase(self) if env_b else None
+        while True:
+            if mon_a.get_result() and (mon_b is None or mon_b.get_result()):
+                break # all running test environments have finished
+
+            # or, in the case both are running, stop as soon as either environemnt shows a failure
+            try:
+                if mon_a.get_result().is_failure():
+                    mon_b.cancel()
+                    break
+            except AttributeError:
+                pass
+            try:
+                if mon_b.get_result().is_failure():
+                    mon_a.cancel()
+                    break
+            except AttributeError:
+                pass
+            time.sleep(0.1)
+
+        if mon_b is not None:
+            # return whichever result is more severe
+            return max(mon_a.get_result(), mon_b.get_result())
+        else:
+            return mon_a.get_result()
+
+class TestResult(object):
+    """ Class to wrap a test result code and a message """
+    # Test status flags, higher = more severe
+    CANCELLED = 0
+    SKIPPED = 1
+    PASSED = 2
+    FAILED = 3
+    ERROR = 4
+
+    STATUS_NAMES = {
+        CANCELLED : "Cancelled",
+        SKIPPED : "Skipped",
+        PASSED : "Passed",
+        FAILED : "Failed",
+        ERROR : "Error"
+        }
+
+    def __init__(self, status, message):
+        self.status = status
+        self.message = message
+
+    def is_failure(self):
+        return self.status >= TestResult.FAILED
+
+    def __cmp__(self, other):
+        if other is None:
+            return 1
+        return self.status - other.status
+
+class TestMonitor(object):
+    """ Class to monitor a running test case in a separate thread, defer reporting of the result until it's done.
+
+    Can poll for completion by calling is_done(), read a TestResult via .get_result()
+    """
+    def __init__(self, port, instance):
+        super(TestMonitor, self).__init__()
+        self._thread = threading.Thread(target=self._monitorThread)
+        self._port = port
+        self._instance = instance
+        self._result = None
+        self._cancelled = False
+        self.output = ""
+        self._thread.start()
+
+    def cancel(self):
+        self._cancelled = True
+
+    def is_done(self):
+        return self._result is not None
+
+    def get_result(self):
+        return self._result
+
+    def _monitorThread(self):
+        self.output = ""
+        start_time = time.time()
+        self._port.timeout = 0.1
+        try:
+            while not self._cancelled and time.time() < start_time + TESTCASE_TIMEOUT:
+                line = self._port.readline().decode("utf-8", "ignore")
+                if line == "":
+                    line = "(TIMED OUT)\r\n"
+                self.output += "%s+%4.2fs %s" % (self._instance, time.time()-start_time, line)
+                verbose_print(line.strip())
+                if line.endswith(":PASS\r\n"):
+                    self._result = TestResult(TestResult.PASSED, "Test passed.")
+                    return
+                elif ":FAIL:" in line:
+                    self._result = TestResult(TestResult.FAILED, "Test failed.")
+                    return
+                elif line == TESTRUNNER_BANNER:
+                    self._result = TestResult(TestResult.ERROR, "Test caused crash and reset.")
+                    return
+            if not self._cancelled:
+                self._result = TestResult(TestResult.CANCELLED, "Cancelled")
+            else:
+                self._result = TestResult(TestResult.ERROR, "Test timed out")
+
+        finally:
+            self._port.timeout = None
+
+class TestEnvironment(object):
+    A = "A"
+    B = "B"
+
+    def __init__(self, port, instance):
+        self._name = port
+        self._port = TestSerialPort(port, baudrate=115200)
+        self._instance = instance
+
+    def reset(self):
+        """ Resets the test board, and waits for the test runner program to start up """
+        self._port.setDTR(False)
+        self._port.setRTS(True)
+        time.sleep(0.05)
+        self._port.flushInput()
+        self._port.setRTS(False)
+        verbose_print("Waiting for test runner startup...")
+        if not self._port.wait_line(lambda line: line == TESTRUNNER_BANNER):
+            raise TestRunnerError("Port %s failed to start test runner" % self._port)
+
+    def get_testlist(self):
+        """ Resets the test board and returns the enumerated list of all supported tests """
+        self.reset()
+        tests = []
+        verbose_print("Enumerating tests...")
+
+        def collect_testcases(line):
+                if line.startswith(">"):
+                    return True # prompt means list of test cases is done, success
+                m = re.match(r"CASE (\d+) = (.+?) ([A-Z]+)", line)
+                if m is not None:
+                    t = TestCase(int(m.group(1)), m.group(2), m.group(3).lower())
+                    verbose_print(t)
+                    tests.append(t)
+        if not self._port.wait_line(collect_testcases):
+            raise TestRunnerError("Port %s failed to read test list" % self._port)
+        verbose_print("Port %s found %d test cases" % (self._name, len(tests)))
+        return tests
+
+    def start_testcase(self, case):
+        """ Starts the specified test instance and returns an TestMonitor reader thread instance to monitor the output """
+        # synchronously start the test case
+        self.reset()
+        if not self._port.wait_line(lambda line: line.startswith(">")):
+            raise TestRunnerError("Failed to read test runnner prompt")
+        command = "%s%d\r\n" % (self._instance, case.index)
+        self._port.write(command.encode("utf-8"))
+        return TestMonitor(self._port, self._instance)
+
+
+def get_testdir():
+    """
+    Return the 'tests' directory in the source tree
+    (assuming the test_runner.py script is in that directory.
+    """
+    res = os.path.dirname(__name__)
+    return "." if res == "" else res
+
+
+def flash_image(serial_port):
+    # Bit hacky: rather than calling esptool directly, just use the Makefile flash target
+    # with the correct ESPPORT argument
+    env = dict(os.environ)
+    env["ESPPORT"] = serial_port
+    verbose_print("Building and flashing test image to %s..." % serial_port)
+    try:
+        stdout = sys.stdout if verbose else None
+        output = subprocess.run(["make","flash"], check=True, cwd=get_testdir(), stdout=stdout, stderr=subprocess.STDOUT, env=env)
+    except subprocess.CalledProcessError as e:
+        raise TestRunnerError("'make flash EPPORT=%s' failed with exit code %d" % (serial_port, e.returncode))
+    verbose_print("Flashing successful.")
+
+
+def parse_args():
+    parser = argparse.ArgumentParser(description='esp-open-rtos testrunner', prog='test_runner')
+
+    parser.add_argument(
+        '--type', '-t',
+        help='Type of test hardware attached to serial ports A & (optionally) B',
+        choices=['solo','dual','eyore_test'], default='solo')
+
+    parser.add_argument(
+        '--aport', '-a',
+        help='Serial port for device A',
+        default='/dev/ttyUSB0')
+
+    parser.add_argument(
+        '--bport', '-b',
+        help='Serial port for device B (ignored if type is \'solo\')',
+        default='/dev/ttyUSB1')
+
+    parser.add_argument(
+        '--no-flash', '-n',
+        help='Don\'t flash the test binary image before running tests',
+        action='store_true',
+        default=False)
+
+    parser.add_argument(
+        '--verbose', '-v',
+        help='Verbose test runner debugging output',
+        action='store_true',
+        default=False)
+
+    parser.add_argument('testcases', nargs='*',
+                        help='Optional list of test cases to run. By default, all tests are run.')
+
+    return parser.parse_args()
+
+
+class TestRunnerError(RuntimeError):
+    def __init__(self, message):
+        RuntimeError.__init__(self, message)
+
+class TestSerialPort(serial.Serial):
+    def __init__(self, *args, **kwargs):
+        super(TestSerialPort, self).__init__(*args, **kwargs)
+
+    def wait_line(self, callback, timeout = TEST_RESET_TIMEOUT):
+        """ Wait for the port to output a particular piece of line content, as judged by callback
+
+            Callback called as 'callback(line)' and returns not-True if non-match otherwise can return any value.
+
+            Returns first non-False result from the callback, or None if it timed out waiting for a new line.
+
+            Note that a serial port spewing legitimate lines of output may block this function forever, if callback
+            doesn't detect this is happening.
+        """
+        self.timeout = timeout
+        try:
+            res = None
+            while not res:
+                line = self.readline()
+                if line == b"":
+                    break # timed out
+                line = line.decode("utf-8", "ignore").rstrip()
+                res = callback(line)
+            return res
+        finally:
+            self.timeout = None
+
+verbose = False
+
+
+def verbose_print(msg):
+    if verbose:
+        print(msg)
+
+if __name__ == '__main__':
+    try:
+        main()
+    except TestRunnerError as e:
+        print(e)
+        sys.exit(2)
+