tests: test_runner, working simple single-ESP "solo" test cases
This commit is contained in:
parent
97a46e8c1a
commit
80b191af08
7 changed files with 492 additions and 22 deletions
|
@ -3,11 +3,15 @@ PROGRAM=tests
|
||||||
EXTRA_LINKER_SCRIPTS = $(PROGRAM_DIR)ld/tests.ld
|
EXTRA_LINKER_SCRIPTS = $(PROGRAM_DIR)ld/tests.ld
|
||||||
PROGRAM_SRC_DIR = $(PROGRAM_DIR) $(PROGRAM_DIR)cases
|
PROGRAM_SRC_DIR = $(PROGRAM_DIR) $(PROGRAM_DIR)cases
|
||||||
|
|
||||||
|
# Add unity test framework headers & core source file
|
||||||
|
PROGRAM_INC_DIR = $(PROGRAM_DIR)unity/src
|
||||||
|
PROGRAM_EXTRA_SRC_FILES = $(PROGRAM_DIR)unity/src/unity.c
|
||||||
|
|
||||||
# append -u <basename_test_entry to the linker arguments for
|
# append -u <basename_test_entry to the linker arguments for
|
||||||
# each source file in the 'cases' directory, so the test case
|
# each source file in the 'cases' directory, so the test case
|
||||||
# entries get added to the compiled binary
|
# entries get added to the compiled binary
|
||||||
TESTCASE_SRC_FILES = $(wildcard $(PROGRAM_DIR)cases/*.c)
|
TESTCASE_SRC_FILES = $(wildcard $(PROGRAM_DIR)cases/*.c)
|
||||||
TESTCASE_ENTRIES = $(sort $(patsubst %.c,%,$(TESTCASE_SRC_FILES)))
|
TESTCASE_ENTRIES = $(sort $(patsubst %.c,%,$(TESTCASE_SRC_FILES)))
|
||||||
EXTRA_LDFLAGS = $(foreach entry,$(TESTCASE_ENTRIES),-u testcase_$(notdir $(entry)))
|
EXTRA_LDFLAGS = $(foreach entry,$(TESTCASE_ENTRIES),$(BUILD_DIR)program/$(entry).o)
|
||||||
|
|
||||||
include ../common.mk
|
include ../common.mk
|
||||||
|
|
|
@ -1,8 +0,0 @@
|
||||||
#include "testcase.h"
|
|
||||||
|
|
||||||
DEFINE_SOLO_TESTCASE(01_basic)
|
|
||||||
|
|
||||||
static bool a_01_basic()
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
71
tests/cases/01_scheduler.c
Normal file
71
tests/cases/01_scheduler.c
Normal file
|
@ -0,0 +1,71 @@
|
||||||
|
#include "testcase.h"
|
||||||
|
#include <FreeRTOS.h>
|
||||||
|
#include <task.h>
|
||||||
|
#include <esp/uart.h>
|
||||||
|
|
||||||
|
/* Basic test cases to validate the FreeRTOS scheduler works */
|
||||||
|
|
||||||
|
DEFINE_SOLO_TESTCASE(01_scheduler_basic)
|
||||||
|
DEFINE_SOLO_TESTCASE(01_scheduler_priorities)
|
||||||
|
|
||||||
|
void set_variable(void *pvParameters)
|
||||||
|
{
|
||||||
|
bool *as_bool = (bool *)pvParameters;
|
||||||
|
*as_bool = true;
|
||||||
|
/* deliberately a busywait at the end, not vTaskSuspend, to test priorities */
|
||||||
|
while(1) { }
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Really simple - do created tasks run? */
|
||||||
|
static void a_01_scheduler_basic()
|
||||||
|
{
|
||||||
|
volatile bool a = false, b = false, c = false;
|
||||||
|
printf("top of scheduler...\n");
|
||||||
|
uart_flush_txfifo(0);
|
||||||
|
|
||||||
|
xTaskCreate(set_variable, (signed char *)"set_a", 128, (void *)&a, tskIDLE_PRIORITY, NULL);
|
||||||
|
xTaskCreate(set_variable, (signed char *)"set_b", 128, (void *)&b, tskIDLE_PRIORITY, NULL);
|
||||||
|
xTaskCreate(set_variable, (signed char *)"set_c", 128, (void *)&c, tskIDLE_PRIORITY, NULL);
|
||||||
|
|
||||||
|
TEST_ASSERT_FALSE_MESSAGE(a, "task set_a shouldn't run yet");
|
||||||
|
TEST_ASSERT_FALSE_MESSAGE(b, "task set_b shouldn't run yet");
|
||||||
|
TEST_ASSERT_FALSE_MESSAGE(c, "task set_c shouldn't run yet");
|
||||||
|
|
||||||
|
vTaskDelay(5);
|
||||||
|
|
||||||
|
TEST_ASSERT_TRUE_MESSAGE(a, "task set_a should have run");
|
||||||
|
TEST_ASSERT_TRUE_MESSAGE(b, "task set_b should have run");
|
||||||
|
TEST_ASSERT_TRUE_MESSAGE(c, "task set_c should have run");
|
||||||
|
TEST_PASS;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Verify that a high-priority task will starve a lower priority task */
|
||||||
|
static void a_01_scheduler_priorities()
|
||||||
|
{
|
||||||
|
/* Increase priority of the init task to make sure it always takes priority */
|
||||||
|
vTaskPrioritySet(xTaskGetCurrentTaskHandle(), tskIDLE_PRIORITY+4);
|
||||||
|
|
||||||
|
bool lower = false, higher = false;
|
||||||
|
xTaskHandle task_lower, task_higher;
|
||||||
|
|
||||||
|
xTaskCreate(set_variable, (signed char *)"high_prio", 128, (void *)&higher, tskIDLE_PRIORITY+1, &task_higher);
|
||||||
|
xTaskCreate(set_variable, (signed char *)"low_prio", 128, (void *)&lower, tskIDLE_PRIORITY, &task_lower);
|
||||||
|
|
||||||
|
TEST_ASSERT_FALSE_MESSAGE(higher, "higher prio task should not have run yet");
|
||||||
|
TEST_ASSERT_FALSE_MESSAGE(lower, "lower prio task should not have run yet");
|
||||||
|
|
||||||
|
vTaskDelay(2);
|
||||||
|
|
||||||
|
TEST_ASSERT_TRUE_MESSAGE(higher, "higher prio task should have run");
|
||||||
|
TEST_ASSERT_FALSE_MESSAGE(lower, "lower prio task should not have run");
|
||||||
|
|
||||||
|
/* Bump lower priority task over higher priority task */
|
||||||
|
vTaskPrioritySet(task_lower, tskIDLE_PRIORITY+2);
|
||||||
|
|
||||||
|
TEST_ASSERT_FALSE_MESSAGE(lower, "lower prio task should still not have run yet");
|
||||||
|
|
||||||
|
vTaskDelay(1);
|
||||||
|
|
||||||
|
TEST_ASSERT_TRUE_MESSAGE(lower, "lower prio task should have run");
|
||||||
|
TEST_PASS;
|
||||||
|
}
|
37
tests/cases/02_heap.c
Normal file
37
tests/cases/02_heap.c
Normal file
|
@ -0,0 +1,37 @@
|
||||||
|
#include "testcase.h"
|
||||||
|
#include <malloc.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include <FreeRTOS.h>
|
||||||
|
|
||||||
|
DEFINE_SOLO_TESTCASE(02_heap_simple)
|
||||||
|
|
||||||
|
/* Simple heap accounting tests */
|
||||||
|
static void a_02_heap_simple()
|
||||||
|
{
|
||||||
|
struct mallinfo info = mallinfo();
|
||||||
|
printf("'arena' allocation size %d bytes\n", info.arena);
|
||||||
|
/* This is really a sanity check, if the "arena" size shrinks then
|
||||||
|
this is a good thing and we can update the test. If it grows
|
||||||
|
then we can also update the test, but we need a good reason. */
|
||||||
|
TEST_ASSERT_INT_WITHIN_MESSAGE(1000, 15000, info.arena, "Initial allocated heap should be approximately 15kB. SEE COMMENT.");
|
||||||
|
|
||||||
|
uint32_t freeheap = xPortGetFreeHeapSize();
|
||||||
|
printf("xPortGetFreeHeapSize = %d bytes\n", freeheap);
|
||||||
|
TEST_ASSERT_TRUE_MESSAGE(freeheap > 20000, "Should be at least 20kB free.");
|
||||||
|
|
||||||
|
uint8_t *buf = malloc(8192);
|
||||||
|
/* <-- have to do something with buf or gcc helpfully optimises it out! */
|
||||||
|
memset(buf, 0xEE, 8192);
|
||||||
|
uint32_t after = xPortGetFreeHeapSize();
|
||||||
|
struct mallinfo after_info = mallinfo();
|
||||||
|
printf("after arena size = %d bytes\n", after_info.arena);
|
||||||
|
printf("after xPortGetFreeHeapSize = %d bytes\n", after);
|
||||||
|
TEST_ASSERT_UINT32_WITHIN_MESSAGE(100, info.arena+8192, after_info.arena, "Allocated heap 'after' size should be 8kB more than before");
|
||||||
|
TEST_ASSERT_UINT32_WITHIN_MESSAGE(100, freeheap-8192, after, "Free heap size should be 8kB less than before");
|
||||||
|
|
||||||
|
free(buf);
|
||||||
|
after = xPortGetFreeHeapSize();
|
||||||
|
printf("after freeing xPortGetFreeHeapSize = %d bytes\n", after);
|
||||||
|
TEST_ASSERT_UINT32_WITHIN_MESSAGE(100, freeheap, after, "Free heap size after freeing buffer should be close to initial");
|
||||||
|
TEST_PASS;
|
||||||
|
}
|
|
@ -4,6 +4,12 @@
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
#include "esp/uart.h"
|
#include "esp/uart.h"
|
||||||
|
|
||||||
|
/* Unity is the framework with test assertions, etc. */
|
||||||
|
#include "unity.h"
|
||||||
|
|
||||||
|
/* Need to explicitly flag once a test has completed successfully. */
|
||||||
|
#define TEST_PASS do { UnityConcludeTest(); while(1) { } } while (0)
|
||||||
|
|
||||||
/* Types of test, defined by hardware requirements */
|
/* Types of test, defined by hardware requirements */
|
||||||
typedef enum {
|
typedef enum {
|
||||||
SOLO, /* Test require "ESP A" only, no other connections */
|
SOLO, /* Test require "ESP A" only, no other connections */
|
||||||
|
@ -11,30 +17,43 @@ typedef enum {
|
||||||
EYORE_TEST, /* Test requires an eyore-test board with onboard STM32F0 */
|
EYORE_TEST, /* Test requires an eyore-test board with onboard STM32F0 */
|
||||||
} testcase_type_t;
|
} testcase_type_t;
|
||||||
|
|
||||||
typedef bool (testcase_fn_t)(void);
|
typedef void (testcase_fn_t)(void);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char *name;
|
char *name;
|
||||||
|
char *file;
|
||||||
|
uint8_t line;
|
||||||
testcase_type_t type;
|
testcase_type_t type;
|
||||||
testcase_fn_t *a_fn;
|
testcase_fn_t *a_fn;
|
||||||
testcase_fn_t *b_fn;
|
testcase_fn_t *b_fn;
|
||||||
} testcase_t;
|
} testcase_t;
|
||||||
|
|
||||||
|
void testcase_register(const testcase_t *ignored);
|
||||||
|
|
||||||
/* Register a test case using these macros. Use DEFINE_SOLO_TESTCASE for single-MCU tests,
|
/* Register a test case using these macros. Use DEFINE_SOLO_TESTCASE for single-MCU tests,
|
||||||
and DEFINE_TESTCASE for all other test types.
|
and DEFINE_TESTCASE for all other test types.
|
||||||
*/
|
*/
|
||||||
#define DEFINE_SOLO_TESTCASE(NAME) \
|
#define DEFINE_SOLO_TESTCASE(NAME) \
|
||||||
static testcase_fn_t a_##NAME; \
|
static testcase_fn_t a_##NAME; \
|
||||||
const __attribute__((section(".testcases.text"))) __attribute__((used)) \
|
_DEFINE_TESTCASE_COMMON(NAME, SOLO, a_##NAME, 0)
|
||||||
testcase_t testcase_##NAME = { .name = #NAME, .type = SOLO, .a_fn = a_##NAME };
|
|
||||||
|
#define DEFINE_TESTCASE(NAME, TYPE) \
|
||||||
|
static testcase_fn_t a_##NAME; \
|
||||||
|
static testcase_fn_t b_##NAME; \
|
||||||
|
_DEFINE_TESTCASE_COMMON(NAME, TYPE, A_##NAME, B_##NAME)
|
||||||
|
|
||||||
|
|
||||||
#define DEFINE_TESTCASE(NAME, TYPE) \
|
#define _DEFINE_TESTCASE_COMMON(NAME, TYPE, A_FN, B_FN) \
|
||||||
static testcase_fn_t a_##NAME; \
|
const __attribute__((section(".testcases.text"))) \
|
||||||
static testcase_fn_t b_##NAME; \
|
testcase_t testcase_##NAME = { .name = #NAME, \
|
||||||
const __attribute__((section(".testcases.text"))) __attribute__((used)) \
|
.file = __FILE__, \
|
||||||
testcase_t testcase_##NAME = { .name = #NAME, .type = TYPE, .a_fn = a_##NAME, .b_fn = b_##NAME };
|
.line = __LINE__, \
|
||||||
|
.type = TYPE, \
|
||||||
|
.a_fn = A_FN, \
|
||||||
|
.b_fn = B_FN, \
|
||||||
|
}; \
|
||||||
|
void __attribute__((constructor)) testcase_ctor_##NAME() { \
|
||||||
|
testcase_register(&testcase_##NAME); \
|
||||||
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -78,9 +78,25 @@ void user_init(void)
|
||||||
type = 'A';
|
type = 'A';
|
||||||
else if (type == 'b')
|
else if (type == 'b')
|
||||||
type = 'B';
|
type = 'B';
|
||||||
printf("Running test case %d as %c (%s %s)\n", case_idx, type, cases_start[case_idx].name, get_requirements_name(cases_start[case_idx].type));
|
const testcase_t *tcase = &cases_start[case_idx];
|
||||||
|
printf("\nRunning test case %d (%s %s) as instance %c \nDefinition at %s:%d\n***\n", case_idx,
|
||||||
|
tcase->name, get_requirements_name(tcase->type), type,
|
||||||
|
tcase->file, tcase->line);
|
||||||
|
Unity.CurrentTestName = tcase->name;
|
||||||
|
Unity.TestFile = tcase->file;
|
||||||
|
Unity.CurrentTestLineNumber = tcase->line;
|
||||||
|
Unity.NumberOfTests = 1;
|
||||||
if(type=='A')
|
if(type=='A')
|
||||||
cases_start[case_idx].a_fn();
|
cases_start[case_idx].a_fn();
|
||||||
else
|
else
|
||||||
cases_start[case_idx].b_fn();
|
cases_start[case_idx].b_fn();
|
||||||
|
TEST_FAIL_MESSAGE("\n\nTest initialisation routine returned without calling TEST_PASS. Buggy test?");
|
||||||
|
}
|
||||||
|
|
||||||
|
/* testcase_register is a no-op function, we just need it so the linker
|
||||||
|
knows to pull in the argument at link time.
|
||||||
|
*/
|
||||||
|
void testcase_register(const testcase_t __attribute__((unused)) *ignored)
|
||||||
|
{
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
331
tests/test_runner.py
Executable file
331
tests/test_runner.py
Executable file
|
@ -0,0 +1,331 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
import sys
|
||||||
|
import argparse
|
||||||
|
import subprocess
|
||||||
|
import os
|
||||||
|
import serial
|
||||||
|
import threading
|
||||||
|
import re
|
||||||
|
import time
|
||||||
|
import traceback
|
||||||
|
|
||||||
|
TEST_RESET_TIMEOUT=0.1
|
||||||
|
TESTCASE_TIMEOUT=10
|
||||||
|
TESTRUNNER_BANNER="esp-open-rtos test runner."
|
||||||
|
|
||||||
|
def main():
|
||||||
|
global verbose
|
||||||
|
args = parse_args()
|
||||||
|
verbose = args.verbose
|
||||||
|
|
||||||
|
if not args.no_flash:
|
||||||
|
flash_image(args.aport)
|
||||||
|
if args.type != 'solo':
|
||||||
|
flash_image(args.bport)
|
||||||
|
|
||||||
|
env = TestEnvironment(args.aport, TestEnvironment.A)
|
||||||
|
cases = env.get_testlist()
|
||||||
|
if args.type != 'solo':
|
||||||
|
env_b = TestEnvironment(args.bport, TestEnvironment.B)
|
||||||
|
cases_b = env_b.get_testlist()
|
||||||
|
if cases != cases_b:
|
||||||
|
raise TestRunnerError("Test cases on units A & B don't match")
|
||||||
|
|
||||||
|
counts = dict((status,0) for status in TestResult.STATUS_NAMES.keys())
|
||||||
|
failures = False
|
||||||
|
for test in cases:
|
||||||
|
res = test.run(env)
|
||||||
|
counts[res.status] += 1
|
||||||
|
failures = failures or res.is_failure()
|
||||||
|
|
||||||
|
print("%20s: %d" % ("Total tests", sum(c for c in counts.values())))
|
||||||
|
print()
|
||||||
|
# print status counts for tests
|
||||||
|
for c in sorted(counts.keys()):
|
||||||
|
print("%20s: %d" % (TestResult.STATUS_NAMES[c], counts[c]))
|
||||||
|
|
||||||
|
sys.exit(1 if failures else 0)
|
||||||
|
|
||||||
|
class TestCase(object):
|
||||||
|
def __init__(self, index, name, case_type):
|
||||||
|
self.name = name
|
||||||
|
self.index = index
|
||||||
|
self.case_type = case_type
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return "#%d: %s (%s)" % (self.index, self.name, self.case_type)
|
||||||
|
|
||||||
|
def __eq__(self, other):
|
||||||
|
return (self.index == other.index
|
||||||
|
and self.name == other.name
|
||||||
|
and self.case_type == other.case_type)
|
||||||
|
|
||||||
|
def run(self, env_a, env_b = None):
|
||||||
|
"""
|
||||||
|
Run the test represented by this instance, against the environment(s) passed in.
|
||||||
|
|
||||||
|
Returns a TestResult
|
||||||
|
"""
|
||||||
|
print("Running test case '%s'..." % self.name)
|
||||||
|
mon_a = env_a.start_testcase(self)
|
||||||
|
mon_b = env_b.start_testcase(self) if env_b else None
|
||||||
|
while True:
|
||||||
|
if mon_a.get_result() and (mon_b is None or mon_b.get_result()):
|
||||||
|
break # all running test environments have finished
|
||||||
|
|
||||||
|
# or, in the case both are running, stop as soon as either environemnt shows a failure
|
||||||
|
try:
|
||||||
|
if mon_a.get_result().is_failure():
|
||||||
|
mon_b.cancel()
|
||||||
|
break
|
||||||
|
except AttributeError:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
if mon_b.get_result().is_failure():
|
||||||
|
mon_a.cancel()
|
||||||
|
break
|
||||||
|
except AttributeError:
|
||||||
|
pass
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
if mon_b is not None:
|
||||||
|
# return whichever result is more severe
|
||||||
|
return max(mon_a.get_result(), mon_b.get_result())
|
||||||
|
else:
|
||||||
|
return mon_a.get_result()
|
||||||
|
|
||||||
|
class TestResult(object):
|
||||||
|
""" Class to wrap a test result code and a message """
|
||||||
|
# Test status flags, higher = more severe
|
||||||
|
CANCELLED = 0
|
||||||
|
SKIPPED = 1
|
||||||
|
PASSED = 2
|
||||||
|
FAILED = 3
|
||||||
|
ERROR = 4
|
||||||
|
|
||||||
|
STATUS_NAMES = {
|
||||||
|
CANCELLED : "Cancelled",
|
||||||
|
SKIPPED : "Skipped",
|
||||||
|
PASSED : "Passed",
|
||||||
|
FAILED : "Failed",
|
||||||
|
ERROR : "Error"
|
||||||
|
}
|
||||||
|
|
||||||
|
def __init__(self, status, message):
|
||||||
|
self.status = status
|
||||||
|
self.message = message
|
||||||
|
|
||||||
|
def is_failure(self):
|
||||||
|
return self.status >= TestResult.FAILED
|
||||||
|
|
||||||
|
def __cmp__(self, other):
|
||||||
|
if other is None:
|
||||||
|
return 1
|
||||||
|
return self.status - other.status
|
||||||
|
|
||||||
|
class TestMonitor(object):
|
||||||
|
""" Class to monitor a running test case in a separate thread, defer reporting of the result until it's done.
|
||||||
|
|
||||||
|
Can poll for completion by calling is_done(), read a TestResult via .get_result()
|
||||||
|
"""
|
||||||
|
def __init__(self, port, instance):
|
||||||
|
super(TestMonitor, self).__init__()
|
||||||
|
self._thread = threading.Thread(target=self._monitorThread)
|
||||||
|
self._port = port
|
||||||
|
self._instance = instance
|
||||||
|
self._result = None
|
||||||
|
self._cancelled = False
|
||||||
|
self.output = ""
|
||||||
|
self._thread.start()
|
||||||
|
|
||||||
|
def cancel(self):
|
||||||
|
self._cancelled = True
|
||||||
|
|
||||||
|
def is_done(self):
|
||||||
|
return self._result is not None
|
||||||
|
|
||||||
|
def get_result(self):
|
||||||
|
return self._result
|
||||||
|
|
||||||
|
def _monitorThread(self):
|
||||||
|
self.output = ""
|
||||||
|
start_time = time.time()
|
||||||
|
self._port.timeout = 0.1
|
||||||
|
try:
|
||||||
|
while not self._cancelled and time.time() < start_time + TESTCASE_TIMEOUT:
|
||||||
|
line = self._port.readline().decode("utf-8", "ignore")
|
||||||
|
if line == "":
|
||||||
|
line = "(TIMED OUT)\r\n"
|
||||||
|
self.output += "%s+%4.2fs %s" % (self._instance, time.time()-start_time, line)
|
||||||
|
verbose_print(line.strip())
|
||||||
|
if line.endswith(":PASS\r\n"):
|
||||||
|
self._result = TestResult(TestResult.PASSED, "Test passed.")
|
||||||
|
return
|
||||||
|
elif ":FAIL:" in line:
|
||||||
|
self._result = TestResult(TestResult.FAILED, "Test failed.")
|
||||||
|
return
|
||||||
|
elif line == TESTRUNNER_BANNER:
|
||||||
|
self._result = TestResult(TestResult.ERROR, "Test caused crash and reset.")
|
||||||
|
return
|
||||||
|
if not self._cancelled:
|
||||||
|
self._result = TestResult(TestResult.CANCELLED, "Cancelled")
|
||||||
|
else:
|
||||||
|
self._result = TestResult(TestResult.ERROR, "Test timed out")
|
||||||
|
|
||||||
|
finally:
|
||||||
|
self._port.timeout = None
|
||||||
|
|
||||||
|
class TestEnvironment(object):
|
||||||
|
A = "A"
|
||||||
|
B = "B"
|
||||||
|
|
||||||
|
def __init__(self, port, instance):
|
||||||
|
self._name = port
|
||||||
|
self._port = TestSerialPort(port, baudrate=115200)
|
||||||
|
self._instance = instance
|
||||||
|
|
||||||
|
def reset(self):
|
||||||
|
""" Resets the test board, and waits for the test runner program to start up """
|
||||||
|
self._port.setDTR(False)
|
||||||
|
self._port.setRTS(True)
|
||||||
|
time.sleep(0.05)
|
||||||
|
self._port.flushInput()
|
||||||
|
self._port.setRTS(False)
|
||||||
|
verbose_print("Waiting for test runner startup...")
|
||||||
|
if not self._port.wait_line(lambda line: line == TESTRUNNER_BANNER):
|
||||||
|
raise TestRunnerError("Port %s failed to start test runner" % self._port)
|
||||||
|
|
||||||
|
def get_testlist(self):
|
||||||
|
""" Resets the test board and returns the enumerated list of all supported tests """
|
||||||
|
self.reset()
|
||||||
|
tests = []
|
||||||
|
verbose_print("Enumerating tests...")
|
||||||
|
|
||||||
|
def collect_testcases(line):
|
||||||
|
if line.startswith(">"):
|
||||||
|
return True # prompt means list of test cases is done, success
|
||||||
|
m = re.match(r"CASE (\d+) = (.+?) ([A-Z]+)", line)
|
||||||
|
if m is not None:
|
||||||
|
t = TestCase(int(m.group(1)), m.group(2), m.group(3).lower())
|
||||||
|
verbose_print(t)
|
||||||
|
tests.append(t)
|
||||||
|
if not self._port.wait_line(collect_testcases):
|
||||||
|
raise TestRunnerError("Port %s failed to read test list" % self._port)
|
||||||
|
verbose_print("Port %s found %d test cases" % (self._name, len(tests)))
|
||||||
|
return tests
|
||||||
|
|
||||||
|
def start_testcase(self, case):
|
||||||
|
""" Starts the specified test instance and returns an TestMonitor reader thread instance to monitor the output """
|
||||||
|
# synchronously start the test case
|
||||||
|
self.reset()
|
||||||
|
if not self._port.wait_line(lambda line: line.startswith(">")):
|
||||||
|
raise TestRunnerError("Failed to read test runnner prompt")
|
||||||
|
command = "%s%d\r\n" % (self._instance, case.index)
|
||||||
|
self._port.write(command.encode("utf-8"))
|
||||||
|
return TestMonitor(self._port, self._instance)
|
||||||
|
|
||||||
|
|
||||||
|
def get_testdir():
|
||||||
|
"""
|
||||||
|
Return the 'tests' directory in the source tree
|
||||||
|
(assuming the test_runner.py script is in that directory.
|
||||||
|
"""
|
||||||
|
res = os.path.dirname(__name__)
|
||||||
|
return "." if res == "" else res
|
||||||
|
|
||||||
|
|
||||||
|
def flash_image(serial_port):
|
||||||
|
# Bit hacky: rather than calling esptool directly, just use the Makefile flash target
|
||||||
|
# with the correct ESPPORT argument
|
||||||
|
env = dict(os.environ)
|
||||||
|
env["ESPPORT"] = serial_port
|
||||||
|
verbose_print("Building and flashing test image to %s..." % serial_port)
|
||||||
|
try:
|
||||||
|
stdout = sys.stdout if verbose else None
|
||||||
|
output = subprocess.run(["make","flash"], check=True, cwd=get_testdir(), stdout=stdout, stderr=subprocess.STDOUT, env=env)
|
||||||
|
except subprocess.CalledProcessError as e:
|
||||||
|
raise TestRunnerError("'make flash EPPORT=%s' failed with exit code %d" % (serial_port, e.returncode))
|
||||||
|
verbose_print("Flashing successful.")
|
||||||
|
|
||||||
|
|
||||||
|
def parse_args():
|
||||||
|
parser = argparse.ArgumentParser(description='esp-open-rtos testrunner', prog='test_runner')
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
'--type', '-t',
|
||||||
|
help='Type of test hardware attached to serial ports A & (optionally) B',
|
||||||
|
choices=['solo','dual','eyore_test'], default='solo')
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
'--aport', '-a',
|
||||||
|
help='Serial port for device A',
|
||||||
|
default='/dev/ttyUSB0')
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
'--bport', '-b',
|
||||||
|
help='Serial port for device B (ignored if type is \'solo\')',
|
||||||
|
default='/dev/ttyUSB1')
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
'--no-flash', '-n',
|
||||||
|
help='Don\'t flash the test binary image before running tests',
|
||||||
|
action='store_true',
|
||||||
|
default=False)
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
'--verbose', '-v',
|
||||||
|
help='Verbose test runner debugging output',
|
||||||
|
action='store_true',
|
||||||
|
default=False)
|
||||||
|
|
||||||
|
parser.add_argument('testcases', nargs='*',
|
||||||
|
help='Optional list of test cases to run. By default, all tests are run.')
|
||||||
|
|
||||||
|
return parser.parse_args()
|
||||||
|
|
||||||
|
|
||||||
|
class TestRunnerError(RuntimeError):
|
||||||
|
def __init__(self, message):
|
||||||
|
RuntimeError.__init__(self, message)
|
||||||
|
|
||||||
|
class TestSerialPort(serial.Serial):
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super(TestSerialPort, self).__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
def wait_line(self, callback, timeout = TEST_RESET_TIMEOUT):
|
||||||
|
""" Wait for the port to output a particular piece of line content, as judged by callback
|
||||||
|
|
||||||
|
Callback called as 'callback(line)' and returns not-True if non-match otherwise can return any value.
|
||||||
|
|
||||||
|
Returns first non-False result from the callback, or None if it timed out waiting for a new line.
|
||||||
|
|
||||||
|
Note that a serial port spewing legitimate lines of output may block this function forever, if callback
|
||||||
|
doesn't detect this is happening.
|
||||||
|
"""
|
||||||
|
self.timeout = timeout
|
||||||
|
try:
|
||||||
|
res = None
|
||||||
|
while not res:
|
||||||
|
line = self.readline()
|
||||||
|
if line == b"":
|
||||||
|
break # timed out
|
||||||
|
line = line.decode("utf-8", "ignore").rstrip()
|
||||||
|
res = callback(line)
|
||||||
|
return res
|
||||||
|
finally:
|
||||||
|
self.timeout = None
|
||||||
|
|
||||||
|
verbose = False
|
||||||
|
|
||||||
|
|
||||||
|
def verbose_print(msg):
|
||||||
|
if verbose:
|
||||||
|
print(msg)
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
try:
|
||||||
|
main()
|
||||||
|
except TestRunnerError as e:
|
||||||
|
print(e)
|
||||||
|
sys.exit(2)
|
||||||
|
|
Loading…
Reference in a new issue