Unit testing for esp-open-rtos (#253)

* Get testing system by projectgus working with master HEAD

* Fix running dual test. Add basic wifi test

* Moved spiff test to a test case. Reset retries in test runner

* Add timers test case

* test_runner: List test cases and run individual test cases

* Add README for tests

* Update README.md

* Code clean-up

* Python3.4 support. README.md update
This commit is contained in:
sheinz 2016-11-05 22:12:14 +02:00 committed by GitHub
parent dda384f3a1
commit 7c702d7f09
19 changed files with 1562 additions and 45 deletions

1
.gitignore vendored
View file

@ -10,3 +10,4 @@ firmware
local.mk
local.h
screenlog.*
*.swp

7
.gitmodules vendored
View file

@ -13,6 +13,9 @@
[submodule "extras/spiffs/spiffs"]
path = extras/spiffs/spiffs
url = https://github.com/pellepl/spiffs.git
[submodule "examples/posix_fs/fs-test"]
path = examples/posix_fs/fs-test
[submodule "tests/unity"]
path = tests/unity
url = https://github.com/ThrowTheSwitch/Unity.git
[submodule "tests/fs-test"]
path = tests/fs-test
url = https://github.com/sheinz/fs-test

View file

@ -136,7 +136,7 @@ $$($(1)_OBJ_DIR)%.o: $$($(1)_REAL_ROOT)%.S $$($(1)_MAKEFILE) $(wildcard $(ROOT)*
$(1)_AR_IN_FILES = $$($(1)_OBJ_FILES)
# the component is shown to depend on both obj and source files so we get
# the component is shown to depend on both obj and source files so we get
# a meaningful error message for missing explicitly named source files
ifeq ($(INCLUDE_SRC_IN_AR),1)
$(1)_AR_IN_FILES += $$($(1)_SRC_FILES)

View file

@ -1,11 +0,0 @@
PROGRAM=posix_fs_example
PROGRAM_EXTRA_SRC_FILES=./fs-test/fs_test.c
EXTRA_COMPONENTS = extras/spiffs
FLASH_SIZE = 32
# spiffs configuration
SPIFFS_BASE_ADDR = 0x200000
SPIFFS_SIZE = 0x100000
include ../../common.mk

View file

@ -1,10 +0,0 @@
# POSIX file access example
This example runs several file system tests on ESP8266.
It uses fs-test library to perform file operations test. fs-test library uses
only POSIX file functions so can be run on host system as well.
Currently included tests:
* File system load test. Perform multiple file operations in random order.
* File system speed test. Measures files read/write speed.

@ -1 +0,0 @@
Subproject commit 2ad547adc5f725594b3c6752f036ff4401b221fc

27
tests/Makefile Normal file
View file

@ -0,0 +1,27 @@
PROGRAM=tests
EXTRA_COMPONENTS=extras/dhcpserver extras/spiffs
PROGRAM_SRC_DIR = . ./cases
FLASH_SIZE = 32
# spiffs configuration
SPIFFS_BASE_ADDR = 0x200000
SPIFFS_SIZE = 0x100000
# Add unity test framework headers & core source file
PROGRAM_INC_DIR = ./unity/src ./fs-test
PROGRAM_EXTRA_SRC_FILES = ./unity/src/unity.c ./fs-test/fs_test.c
TESTCASE_SRC_FILES = $(wildcard $(PROGRAM_DIR)cases/*.c)
# Do not include source files into a static library because when adding this
# library with '--whole-archive' linker gives error that archive contains
# unknown objects (source files)
INCLUDE_SRC_IN_AR = 0
# Link every object in the 'program' archive, to pick up constructor functions for test cases
EXTRA_LDFLAGS = -Wl,--whole-archive $(BUILD_DIR)program.a -Wl,--no-whole-archive
include ../common.mk

66
tests/README.md Normal file
View file

@ -0,0 +1,66 @@
# esp-open-rtos tests
Testing is based on [Unity](https://github.com/ThrowTheSwitch/Unity)
C testing framework.
## Features
* Single device test case.
* Dual devices test cases. Run test case on two ESP8266 modules simultaneously.
* Run only specified test cases.
* List available test cases on a device.
## Usage
There's a test runner script `test_runner.py` written in Python3 that runs
test cases on ESP8266 connected to a host.
### Requirements and dependencies
* Python3 version > 3.4 `sudo apt-get install python3 python3-pip`
* pyserial `sudo pip3 install pyserial`
* ESP8266 board with reset to boot mode support
* Two ESP8266 for dual mode test cases
Test runner is heavily relying on device reset using DTR and RTS signals.
Popular ESP8266 boards such as **NodeMcu** and **Wemos D1** support device
reset into flash mode.
### Options
`--type` or `-t` - Type of test case to run. Can be 'solo' or 'dual'.
If not specified 'solo' test will be run.
`--aport` or `-a` - Serial port for device A.
If not specified device `/dev/ttyUSB0` is used.
`--bport` or `-b` - Serial port for device B.
If not specified device `/dev/ttyUSB1` is used.
`--no-flash` or `-n` - Do not flash the test firmware before running tests.
`--list` or `-l` - Display list of the available test cases on the device.
### Example
Build test firmware, flash it using serial device `/dev/tty.wchusbserial1410`
and run only *solo* test cases:
`./test_runner.py -a /dev/tty.wchusbserial1410`
Build test firmware. Flash both devices as `-t dual` is specified. And run both
*solo* and *dual* test cases:
`./test_runner.py -a /dev/tty.wchusbserial1410 -b /dev/tty.wchusbserial1420 -t dual`
Do not flash the firmware, only display available test cases on the device:
`./test_runner.py -a /dev/tty.wchusbserial1410 -n -l`
Do not flash the firmware and run only 2 and 4 test cases:
`./test_runner.py -a /dev/tty.wchusbserial1410 -n 2 4`
## References
[Unity](https://github.com/ThrowTheSwitch/Unity) - Simple Unit Testing for C

View file

@ -0,0 +1,71 @@
#include "testcase.h"
#include <FreeRTOS.h>
#include <task.h>
#include <esp/uart.h>
/* Basic test cases to validate the FreeRTOS scheduler works */
DEFINE_SOLO_TESTCASE(01_scheduler_basic)
DEFINE_SOLO_TESTCASE(01_scheduler_priorities)
void set_variable(void *pvParameters)
{
bool *as_bool = (bool *)pvParameters;
*as_bool = true;
/* deliberately a busywait at the end, not vTaskSuspend, to test priorities */
while(1) { }
}
/* Really simple - do created tasks run? */
static void a_01_scheduler_basic()
{
volatile bool a = false, b = false, c = false;
printf("top of scheduler...\n");
uart_flush_txfifo(0);
xTaskCreate(set_variable, "set_a", 128, (void *)&a, tskIDLE_PRIORITY, NULL);
xTaskCreate(set_variable, "set_b", 128, (void *)&b, tskIDLE_PRIORITY, NULL);
xTaskCreate(set_variable, "set_c", 128, (void *)&c, tskIDLE_PRIORITY, NULL);
TEST_ASSERT_FALSE_MESSAGE(a, "task set_a shouldn't run yet");
TEST_ASSERT_FALSE_MESSAGE(b, "task set_b shouldn't run yet");
TEST_ASSERT_FALSE_MESSAGE(c, "task set_c shouldn't run yet");
vTaskDelay(5);
TEST_ASSERT_TRUE_MESSAGE(a, "task set_a should have run");
TEST_ASSERT_TRUE_MESSAGE(b, "task set_b should have run");
TEST_ASSERT_TRUE_MESSAGE(c, "task set_c should have run");
TEST_PASS();
}
/* Verify that a high-priority task will starve a lower priority task */
static void a_01_scheduler_priorities()
{
/* Increase priority of the init task to make sure it always takes priority */
vTaskPrioritySet(xTaskGetCurrentTaskHandle(), tskIDLE_PRIORITY+4);
bool lower = false, higher = false;
TaskHandle_t task_lower, task_higher;
xTaskCreate(set_variable, "high_prio", 128, (void *)&higher, tskIDLE_PRIORITY+1, &task_higher);
xTaskCreate(set_variable, "low_prio", 128, (void *)&lower, tskIDLE_PRIORITY, &task_lower);
TEST_ASSERT_FALSE_MESSAGE(higher, "higher prio task should not have run yet");
TEST_ASSERT_FALSE_MESSAGE(lower, "lower prio task should not have run yet");
vTaskDelay(2);
TEST_ASSERT_TRUE_MESSAGE(higher, "higher prio task should have run");
TEST_ASSERT_FALSE_MESSAGE(lower, "lower prio task should not have run");
/* Bump lower priority task over higher priority task */
vTaskPrioritySet(task_lower, tskIDLE_PRIORITY+2);
TEST_ASSERT_FALSE_MESSAGE(lower, "lower prio task should still not have run yet");
vTaskDelay(1);
TEST_ASSERT_TRUE_MESSAGE(lower, "lower prio task should have run");
TEST_PASS();
}

56
tests/cases/02_heap.c Normal file
View file

@ -0,0 +1,56 @@
#include "testcase.h"
#include <malloc.h>
#include <string.h>
#include <FreeRTOS.h>
DEFINE_SOLO_TESTCASE(02_heap_simple)
DEFINE_SOLO_TESTCASE(02_heap_full)
/* Simple heap accounting tests */
static void a_02_heap_simple()
{
struct mallinfo info = mallinfo();
printf("'arena' allocation size %d bytes\n", info.arena);
/* This is really a sanity check, if the "arena" size shrinks then
this is a good thing and we can update the test. If it grows
then we can also update the test, but we need a good reason. */
TEST_ASSERT_INT_WITHIN_MESSAGE(1000, 15000, info.arena, "Initial allocated heap should be approximately 15kB. SEE COMMENT.");
uint32_t freeheap = xPortGetFreeHeapSize();
printf("xPortGetFreeHeapSize = %d bytes\n", freeheap);
TEST_ASSERT_TRUE_MESSAGE(freeheap > 20000, "Should be at least 20kB free.");
uint8_t *buf = malloc(8192);
/* <-- have to do something with buf or gcc helpfully optimises it out! */
memset(buf, 0xEE, 8192);
uint32_t after = xPortGetFreeHeapSize();
struct mallinfo after_info = mallinfo();
printf("after arena size = %d bytes\n", after_info.arena);
printf("after xPortGetFreeHeapSize = %d bytes\n", after);
TEST_ASSERT_UINT32_WITHIN_MESSAGE(100, info.arena+8192, after_info.arena, "Allocated heap 'after' size should be 8kB more than before");
TEST_ASSERT_UINT32_WITHIN_MESSAGE(100, freeheap-8192, after, "Free heap size should be 8kB less than before");
free(buf);
after = xPortGetFreeHeapSize();
printf("after freeing xPortGetFreeHeapSize = %d bytes\n", after);
TEST_ASSERT_UINT32_WITHIN_MESSAGE(100, freeheap, after, "Free heap size after freeing buffer should be close to initial");
TEST_PASS();
}
/* Ensure malloc behaves when out of memory */
static void a_02_heap_full()
{
void *x = malloc(65536);
TEST_ASSERT_NULL_MESSAGE(x, "Allocating 64kB should fail and return null");
void *y = malloc(32768);
TEST_ASSERT_NOT_NULL_MESSAGE(y, "Allocating 32kB should succeed");
void *z = malloc(32768);
TEST_ASSERT_NULL_MESSAGE(z, "Allocating second 32kB should fail");
free(y);
z = malloc(32768);
TEST_ASSERT_NOT_NULL_MESSAGE(z, "Allocating 32kB should succeed after first block freed");
TEST_PASS();
}

View file

@ -0,0 +1,481 @@
/**
* Unit tests to verify the "unaligned load handler" in core/exception_vectors.S
* that allows us to complete byte loads from unaligned memory, etc.
*
* Adapted from a test program in 'experiments' that did this.
*/
#include "testcase.h"
#include "esp/rom.h"
#include "esp/timer.h"
#include "esp/uart.h"
#include "espressif/esp_common.h"
#include "xtensa_ops.h"
#include "FreeRTOS.h"
#include "task.h"
#include "queue.h"
#include "string.h"
#include "strings.h"
#include <malloc.h>
#define TESTSTRING "O hai there! %d %d %d"
static char dramtest[] = TESTSTRING;
static const __attribute__((section(".iram1.notrodata")))
char iramtest[] = TESTSTRING;
static const __attribute__((section(".text.notrodata")))
char iromtest[] = TESTSTRING;
static const volatile __attribute__((section(".iram1.notliterals")))
int16_t unsigned_shorts[] = { -3, -4, -5, -32767, 44 };
static const __attribute__((section(".iram1.notrodata")))
char sanity_test_data[] = {
0x01, 0x55, 0x7e, 0x2a, 0x81, 0xd5, 0xfe, 0xaa
};
DEFINE_SOLO_TESTCASE(03_byte_load_verify_sections)
#define PTR_IN_REGION(PTR, START, LEN) \
((START <= (intptr_t)(PTR)) && ((intptr_t)(PTR) < (START+LEN)))
/* Sanity check, ensure the addresses of the various test strings
* are in the correct address space regions. */
static void a_03_byte_load_verify_sections()
{
printf("dramtest addr %p\n", dramtest);
TEST_ASSERT_MESSAGE(PTR_IN_REGION(dramtest, 0x3FFE8000, 0x14000),
"dramtest should be in DRAM region");
printf("iramtest addr %p\n", iramtest);
TEST_ASSERT_MESSAGE(PTR_IN_REGION(iramtest, 0x40100000, 0x8000),
"iramtest should be in IRAM region");
printf("iromtest addr %p\n", iromtest);
TEST_ASSERT_MESSAGE(PTR_IN_REGION(iromtest, 0x40202010, (0x100000 - 0x2010)),
"iromtest sohuld be in IROM region");
printf("unsigned_shorts addr %p\n", unsigned_shorts);
TEST_ASSERT_MESSAGE(PTR_IN_REGION(unsigned_shorts, 0x40100000, 0x8000),
"unsigned_shorts should be in IRAM region");
printf("sanity_test_data addr %p\n", sanity_test_data);
TEST_ASSERT_MESSAGE(PTR_IN_REGION(sanity_test_data, 0x40100000, 0x8000),
"sanity_test_data should be in IRAM region");
TEST_PASS();
}
/* test utility functions used for '03_byte_load_test_strings'
returns the expected string result */
typedef const char *(* test_with_fn_t)(const char *string);
static char buf[64];
static const char * test_memcpy_aligned(const char *string)
{
memcpy(buf, string, 16);
return "O hai there! %d ";
}
static const char * test_memcpy_unaligned(const char *string)
{
memcpy(buf, string, 15);
return "O hai there! %d";
}
static const char * test_memcpy_unaligned2(const char *string)
{
memcpy(buf, string+1, 15);
return " hai there! %d ";
}
static const char * test_strcpy(const char *string)
{
strcpy(buf, string);
return dramtest;
}
static const char * test_sprintf(const char *string)
{
sprintf(buf, string, 1, 2, 3);
return "O hai there! 1 2 3";
}
static const char * test_sprintf_arg(const char *string)
{
sprintf(buf, "%s", string);
return dramtest;
}
static const char * test_naive_strcpy(const char *string)
{
char *to = buf;
while((*to++ = *string++))
;
return dramtest;
}
static const char * test_naive_strcpy_a0(const char *string)
{
asm volatile (
" mov a8, %0 \n"
" mov a9, %1 \n"
"tns_loop%=: l8ui a0, a9, 0 \n"
" addi.n a9, a9, 1 \n"
" s8i a0, a8, 0 \n"
" addi.n a8, a8, 1 \n"
" bnez a0, tns_loop%=\n"
: : "r" (buf), "r" (string) : "a0", "a8", "a9");
return dramtest;
}
static const char * test_naive_strcpy_a2(const char *string)
{
asm volatile (
" mov a8, %0 \n"
" mov a9, %1 \n"
"tns_loop%=: l8ui a2, a9, 0 \n"
" addi.n a9, a9, 1 \n"
" s8i a2, a8, 0 \n"
" addi.n a8, a8, 1 \n"
" bnez a2, tns_loop%=\n"
: : "r" (buf), "r" (string) : "a2", "a8", "a9");
return dramtest;
}
static const char * test_naive_strcpy_a3(const char *string)
{
asm volatile (
" mov a8, %0 \n"
" mov a9, %1 \n"
"tns_loop%=: l8ui a3, a9, 0 \n"
" addi.n a9, a9, 1 \n"
" s8i a3, a8, 0 \n"
" addi.n a8, a8, 1 \n"
" bnez a3, tns_loop%=\n"
: : "r" (buf), "r" (string) : "a3", "a8", "a9");
return TESTSTRING;
}
static const char * test_naive_strcpy_a4(const char *string)
{
asm volatile (
" mov a8, %0 \n"
" mov a9, %1 \n"
"tns_loop%=: l8ui a4, a9, 0 \n"
" addi.n a9, a9, 1 \n"
" s8i a4, a8, 0 \n"
" addi.n a8, a8, 1 \n"
" bnez a4, tns_loop%=\n"
: : "r" (buf), "r" (string) : "a4", "a8", "a9");
return TESTSTRING;
}
static const char * test_naive_strcpy_a5(const char *string)
{
asm volatile (
" mov a8, %0 \n"
" mov a9, %1 \n"
"tns_loop%=: l8ui a5, a9, 0 \n"
" addi.n a9, a9, 1 \n"
" s8i a5, a8, 0 \n"
" addi.n a8, a8, 1 \n"
" bnez a5, tns_loop%=\n"
: : "r" (buf), "r" (string) : "a5", "a8", "a9");
return TESTSTRING;
}
static const char * test_naive_strcpy_a6(const char *string)
{
asm volatile (
" mov a8, %0 \n"
" mov a9, %1 \n"
"tns_loop%=: l8ui a6, a9, 0 \n"
" addi.n a9, a9, 1 \n"
" s8i a6, a8, 0 \n"
" addi.n a8, a8, 1 \n"
" bnez a6, tns_loop%=\n"
: : "r" (buf), "r" (string) : "a6", "a8", "a9");
return TESTSTRING;
}
static const char * test_noop(const char *string)
{
buf[0] = 0;
return "";
}
static uint32_t IRAM inner_string_test(const char *string, test_with_fn_t testfn, const char *testfn_label, uint32_t nullvalue, bool evict_cache)
{
printf(" .. against %30s: ", testfn_label);
vPortEnterCritical();
uint32_t before;
RSR(before, CCOUNT);
const int TEST_REPEATS = 1000;
for(int i = 0; i < TEST_REPEATS; i++) {
memset(buf, 0, sizeof(buf));
const char *expected = testfn(string);
TEST_ASSERT_EQUAL_STRING_MESSAGE(expected, buf, testfn_label);
if(evict_cache) {
Cache_Read_Disable();
Cache_Read_Enable(0,0,1);
}
}
uint32_t after;
RSR(after, CCOUNT);
vPortExitCritical();
uint32_t instructions = (after-before)/TEST_REPEATS - nullvalue;
printf("%5d instructions\r\n", instructions);
return instructions;
}
static void string_test(const char *string, char *label, bool evict_cache)
{
printf("Testing %s (%p) '%s'\r\n", label, string, string);
printf("Formats as: '");
printf(string, 1, 2, 3);
printf("'\r\n");
uint32_t nullvalue = inner_string_test(string, test_noop, "null op", 0, evict_cache);
inner_string_test(string, test_memcpy_aligned, "memcpy - aligned len", nullvalue, evict_cache);
inner_string_test(string, test_memcpy_unaligned, "memcpy - unaligned len", nullvalue, evict_cache);
inner_string_test(string, test_memcpy_unaligned2, "memcpy - unaligned start&len", nullvalue, evict_cache);
inner_string_test(string, test_strcpy, "strcpy", nullvalue, evict_cache);
inner_string_test(string, test_naive_strcpy, "naive strcpy", nullvalue, evict_cache);
inner_string_test(string, test_naive_strcpy_a0, "naive strcpy (a0)", nullvalue, evict_cache);
inner_string_test(string, test_naive_strcpy_a2, "naive strcpy (a2)", nullvalue, evict_cache);
inner_string_test(string, test_naive_strcpy_a3, "naive strcpy (a3)", nullvalue, evict_cache);
inner_string_test(string, test_naive_strcpy_a4, "naive strcpy (a4)", nullvalue, evict_cache);
inner_string_test(string, test_naive_strcpy_a5, "naive strcpy (a5)", nullvalue, evict_cache);
inner_string_test(string, test_naive_strcpy_a6, "naive strcpy (a6)", nullvalue, evict_cache);
inner_string_test(string, test_sprintf, "sprintf", nullvalue, evict_cache);
inner_string_test(string, test_sprintf_arg, "sprintf format arg", nullvalue, evict_cache);
}
DEFINE_SOLO_TESTCASE(03_byte_load_test_strings)
/* Test various operations on strings in various regions */
static void a_03_byte_load_test_strings()
{
string_test(dramtest, "DRAM", 0);
string_test(iramtest, "IRAM", 0);
string_test(iromtest, "Cached flash", 0);
string_test(iromtest, "'Uncached' flash", 1);
TEST_PASS();
}
static volatile bool frc1_ran;
static volatile bool frc1_finished;
static volatile char frc1_buf[80];
DEFINE_SOLO_TESTCASE(03_byte_load_test_isr)
static void frc1_interrupt_handler(void)
{
frc1_ran = true;
timer_set_run(FRC1, false);
strcpy((char *)frc1_buf, iramtest);
frc1_finished = true;
}
/* Verify that the unaligned loader can run inside an ISR */
static void a_03_byte_load_test_isr()
{
printf("Testing behaviour inside ISRs...\r\n");
timer_set_interrupts(FRC1, false);
timer_set_run(FRC1, false);
_xt_isr_attach(INUM_TIMER_FRC1, frc1_interrupt_handler);
timer_set_frequency(FRC1, 1000);
timer_set_interrupts(FRC1, true);
timer_set_run(FRC1, true);
sdk_os_delay_us(2000);
if(!frc1_ran)
TEST_FAIL_MESSAGE("ERROR: FRC1 timer exception never fired.\r\n");
else if(!frc1_finished)
TEST_FAIL_MESSAGE("ERROR: FRC1 timer exception never finished.\r\n");
else if(strcmp((char *)frc1_buf, iramtest))
TEST_FAIL_MESSAGE("ERROR: FRC1 strcpy from IRAM failed.\r\n");
else
TEST_PASS();
}
DEFINE_SOLO_TESTCASE(03_byte_load_test_sign_extension)
static void a_03_byte_load_test_sign_extension()
{
/* this step seems to be necessary so the compiler will actually generate l16si */
int16_t *shorts_p = (int16_t *)unsigned_shorts;
if(shorts_p[0] == -3 && shorts_p[1] == -4 && shorts_p[2] == -5 && shorts_p[3] == -32767 && shorts_p[4] == 44)
{
TEST_PASS();
} else {
sprintf(buf, "l16si sign extension failed. Got values %d %d %d %d %d\r\n", shorts_p[0], shorts_p[1], shorts_p[2], shorts_p[3], shorts_p[4]);
TEST_FAIL_MESSAGE(buf);
}
}
/* test that running unaligned loads in a running FreeRTOS system doesn't break things
The following tests run inside a FreeRTOS task, after everything else.
*/
DEFINE_SOLO_TESTCASE(03_byte_load_test_system_interaction);
static void task_load_test_system_interaction()
{
uint32_t start = xTaskGetTickCount();
printf("Starting system/timer interaction test (takes approx 1 second)...\n");
for(int i = 0; i < 5000; i++) {
test_naive_strcpy_a0(iromtest);
test_naive_strcpy_a2(iromtest);
test_naive_strcpy_a3(iromtest);
test_naive_strcpy_a4(iromtest);
test_naive_strcpy_a5(iromtest);
test_naive_strcpy_a6(iromtest);
/*
const volatile char *string = iromtest;
volatile char *to = dest;
while((*to++ = *string++))
;
*/
}
uint32_t ticks = xTaskGetTickCount() - start;
printf("Timer interaction test PASSED after %d ticks.\n", ticks);
TEST_PASS();
}
static void a_03_byte_load_test_system_interaction()
{
xTaskCreate(task_load_test_system_interaction, "interactionTask", 256, NULL, 2, NULL);
while(1) {
vTaskDelay(100);
}
}
/* The following "sanity tests" are designed to try to execute every code path
* of the LoadStoreError handler, with a variety of offsets and data values
* designed to catch any mask/shift errors, sign-extension bugs, etc */
DEFINE_SOLO_TESTCASE(03_byte_load_test_sanity)
/* (Contrary to expectations, 'mov a15, a15' in Xtensa is not technically a
* no-op, but is officially "undefined and reserved for future use", so we need
* a special case in the case where reg == "a15" so we don't end up generating
* those opcodes. GCC is smart enough to optimize away the whole conditional
* and just insert the correct asm block, since `reg` is a static argument.) */
#define LOAD_VIA_REG(op, reg, addr, var) \
if (strcmp(reg, "a15")) { \
asm volatile ( \
"mov a15, " reg "\n\t" \
op " " reg ", %1, 0\n\t" \
"mov %0, " reg "\n\t" \
"mov " reg ", a15\n\t" \
: "=r" (var) : "r" (addr) : "a15" ); \
} else { \
asm volatile ( \
op " " reg ", %1, 0\n\t" \
"mov %0, " reg "\n\t" \
: "=r" (var) : "r" (addr) : "a15" ); \
}
#define TEST_LOAD(op, reg, addr, value) \
{ \
int32_t result; \
LOAD_VIA_REG(op, reg, addr, result); \
if (result != value) sanity_test_failed(op, reg, addr, value, result); \
}
static void sanity_test_failed(const char *testname, const char *reg, const void *addr, int32_t value, int32_t result) {
uint32_t actual_data = *(uint32_t *)((uint32_t)addr & 0xfffffffc);
sprintf(buf, "%s %s from %p (32-bit value: 0x%x): Expected 0x%08x (%d), got 0x%08x (%d)\n", testname, reg, addr, actual_data, value, value, result, result);
TEST_FAIL_MESSAGE(buf);
}
static void sanity_test_l8ui(const void *addr, int32_t value) {
TEST_LOAD("l8ui", "a0", addr, value);
TEST_LOAD("l8ui", "a1", addr, value);
TEST_LOAD("l8ui", "a2", addr, value);
TEST_LOAD("l8ui", "a3", addr, value);
TEST_LOAD("l8ui", "a4", addr, value);
TEST_LOAD("l8ui", "a5", addr, value);
TEST_LOAD("l8ui", "a6", addr, value);
TEST_LOAD("l8ui", "a7", addr, value);
TEST_LOAD("l8ui", "a8", addr, value);
TEST_LOAD("l8ui", "a9", addr, value);
TEST_LOAD("l8ui", "a10", addr, value);
TEST_LOAD("l8ui", "a11", addr, value);
TEST_LOAD("l8ui", "a12", addr, value);
TEST_LOAD("l8ui", "a13", addr, value);
TEST_LOAD("l8ui", "a14", addr, value);
TEST_LOAD("l8ui", "a15", addr, value);
}
static void sanity_test_l16ui(const void *addr, int32_t value) {
TEST_LOAD("l16ui", "a0", addr, value);
TEST_LOAD("l16ui", "a1", addr, value);
TEST_LOAD("l16ui", "a2", addr, value);
TEST_LOAD("l16ui", "a3", addr, value);
TEST_LOAD("l16ui", "a4", addr, value);
TEST_LOAD("l16ui", "a5", addr, value);
TEST_LOAD("l16ui", "a6", addr, value);
TEST_LOAD("l16ui", "a7", addr, value);
TEST_LOAD("l16ui", "a8", addr, value);
TEST_LOAD("l16ui", "a9", addr, value);
TEST_LOAD("l16ui", "a10", addr, value);
TEST_LOAD("l16ui", "a11", addr, value);
TEST_LOAD("l16ui", "a12", addr, value);
TEST_LOAD("l16ui", "a13", addr, value);
TEST_LOAD("l16ui", "a14", addr, value);
TEST_LOAD("l16ui", "a15", addr, value);
}
static void sanity_test_l16si(const void *addr, int32_t value) {
TEST_LOAD("l16si", "a0", addr, value);
TEST_LOAD("l16si", "a1", addr, value);
TEST_LOAD("l16si", "a2", addr, value);
TEST_LOAD("l16si", "a3", addr, value);
TEST_LOAD("l16si", "a4", addr, value);
TEST_LOAD("l16si", "a5", addr, value);
TEST_LOAD("l16si", "a6", addr, value);
TEST_LOAD("l16si", "a7", addr, value);
TEST_LOAD("l16si", "a8", addr, value);
TEST_LOAD("l16si", "a9", addr, value);
TEST_LOAD("l16si", "a10", addr, value);
TEST_LOAD("l16si", "a11", addr, value);
TEST_LOAD("l16si", "a12", addr, value);
TEST_LOAD("l16si", "a13", addr, value);
TEST_LOAD("l16si", "a14", addr, value);
TEST_LOAD("l16si", "a15", addr, value);
}
static void a_03_byte_load_test_sanity(void) {
printf("== Performing sanity tests (sanity_test_data @ %p)...\n", sanity_test_data);
sanity_test_l8ui(sanity_test_data + 0, 0x01);
sanity_test_l8ui(sanity_test_data + 1, 0x55);
sanity_test_l8ui(sanity_test_data + 2, 0x7e);
sanity_test_l8ui(sanity_test_data + 3, 0x2a);
sanity_test_l8ui(sanity_test_data + 4, 0x81);
sanity_test_l8ui(sanity_test_data + 5, 0xd5);
sanity_test_l8ui(sanity_test_data + 6, 0xfe);
sanity_test_l8ui(sanity_test_data + 7, 0xaa);
sanity_test_l16ui(sanity_test_data + 0, 0x5501);
sanity_test_l16ui(sanity_test_data + 2, 0x2a7e);
sanity_test_l16ui(sanity_test_data + 4, 0xd581);
sanity_test_l16ui(sanity_test_data + 6, 0xaafe);
sanity_test_l16si(sanity_test_data + 0, 0x5501);
sanity_test_l16si(sanity_test_data + 2, 0x2a7e);
sanity_test_l16si(sanity_test_data + 4, -10879);
sanity_test_l16si(sanity_test_data + 6, -21762);
printf("== Sanity tests completed.\n");
TEST_PASS();
}

181
tests/cases/04_wifi_basic.c Normal file
View file

@ -0,0 +1,181 @@
/**
* This test verifies basic WiFi communication.
*
* Device A creates a WiFi access point and listens on port 23 for incomming
* connection. When incomming connection occurs it sends a string and waits
* for the response.
*
* Device B connects to a WiFi access point and opens TCP connection to
* device A on port 23. Then it receives a string and sends it back.
*/
#include <string.h>
#include <FreeRTOS.h>
#include <task.h>
#include <espressif/esp_common.h>
#include "sdk_internal.h"
#include <lwip/api.h>
#include <lwip/err.h>
#include <lwip/sockets.h>
#include <lwip/sys.h>
#include <lwip/netdb.h>
#include <lwip/dns.h>
#include <dhcpserver.h>
#include "testcase.h"
#define AP_SSID "esp-open-rtos-ap"
#define AP_PSK "esp-open-rtos"
#define SERVER "172.16.0.1"
#define PORT 23
#define BUF_SIZE 128
DEFINE_TESTCASE(04_wifi_basic, DUAL)
/*********************************************************
* WiFi AP part
*********************************************************/
static void server_task(void *pvParameters)
{
char buf[BUF_SIZE];
struct netconn *nc = netconn_new(NETCONN_TCP);
TEST_ASSERT_TRUE_MESSAGE(nc != 0, "Failed to allocate socket");
netconn_bind(nc, IP_ADDR_ANY, PORT);
netconn_listen(nc);
struct netconn *client = NULL;
err_t err = netconn_accept(nc, &client);
TEST_ASSERT_TRUE_MESSAGE(err == ERR_OK, "Error accepting connection");
ip_addr_t client_addr;
uint16_t port_ignore;
netconn_peer(client, &client_addr, &port_ignore);
snprintf(buf, sizeof(buf), "test ping\r\n");
printf("Device A: send data: %s\n", buf);
netconn_write(client, buf, strlen(buf), NETCONN_COPY);
struct pbuf *pb;
for (int i = 0; i < 10; i++) {
TEST_ASSERT_EQUAL_INT_MESSAGE(ERR_OK, netconn_recv_tcp_pbuf(client, &pb),
"Failed to receive data");
if (pb->len > 0) {
memcpy(buf, pb->payload, pb->len);
buf[pb->len] = 0;
break;
}
vTaskDelay(100 / portTICK_PERIOD_MS);
}
TEST_ASSERT_TRUE_MESSAGE(pb->len > 0, "No data received");
printf("Device A: received data: %s\n", buf);
TEST_ASSERT_FALSE_MESSAGE(strcmp((const char*)buf, "test pong\r\n"),
"Received wrong data");
netconn_delete(client);
TEST_PASS();
}
static void a_04_wifi_basic(void)
{
printf("Device A started\n");
sdk_wifi_set_opmode(SOFTAP_MODE);
struct ip_info ap_ip;
IP4_ADDR(&ap_ip.ip, 172, 16, 0, 1);
IP4_ADDR(&ap_ip.gw, 0, 0, 0, 0);
IP4_ADDR(&ap_ip.netmask, 255, 255, 0, 0);
sdk_wifi_set_ip_info(1, &ap_ip);
struct sdk_softap_config ap_config = {
.ssid = AP_SSID,
.ssid_hidden = 0,
.channel = 3,
.ssid_len = strlen(AP_SSID),
.authmode = AUTH_WPA_WPA2_PSK,
.password = AP_PSK,
.max_connection = 3,
.beacon_interval = 100,
};
sdk_wifi_softap_set_config(&ap_config);
ip_addr_t first_client_ip;
IP4_ADDR(&first_client_ip, 172, 16, 0, 2);
dhcpserver_start(&first_client_ip, 4);
xTaskCreate(server_task, "setver_task", 1024, NULL, 2, NULL);
}
/*********************************************************
* WiFi client part
*********************************************************/
static void connect_task(void *pvParameters)
{
struct sockaddr_in serv_addr;
char buf[BUF_SIZE];
// wait for wifi connection
while (sdk_wifi_station_get_connect_status() != STATION_GOT_IP) {
vTaskDelay(1000 / portTICK_PERIOD_MS);
printf("Waiting for connection to AP\n");
}
int s = socket(AF_INET, SOCK_STREAM, 0);
TEST_ASSERT_TRUE_MESSAGE(s >= 0, "Failed to allocate a socket");
bzero(&serv_addr, sizeof(serv_addr));
serv_addr.sin_port = htons(PORT);
serv_addr.sin_family = AF_INET;
TEST_ASSERT_TRUE_MESSAGE(inet_aton(SERVER, &serv_addr.sin_addr.s_addr),
"Failed to set IP address");
TEST_ASSERT_TRUE_MESSAGE(
connect(s, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == 0,
"Socket connection failed");
bzero(buf, BUF_SIZE);
int r = 0;
for (int i = 0; i < 10; i++) {
r = read(s, buf, BUF_SIZE);
if (r > 0) {
break;
}
vTaskDelay(100 / portTICK_PERIOD_MS);
}
TEST_ASSERT_TRUE_MESSAGE(r > 0, "No data received");
printf("Device B: received data: %s\n", buf);
TEST_ASSERT_FALSE_MESSAGE(strcmp((const char*)buf, "test ping\r\n"),
"Received wrong data");
snprintf(buf, sizeof(buf), "test pong\r\n");
printf("Device B: send data: %s\n", buf);
TEST_ASSERT_EQUAL_INT_MESSAGE(strlen(buf), write(s, buf, strlen(buf)),
"Error socket writing");
close(s);
TEST_PASS();
}
static void b_04_wifi_basic(void)
{
printf("Device B started\n");
struct sdk_station_config config = {
.ssid = AP_SSID,
.password = AP_PSK,
};
sdk_wifi_set_opmode(STATION_MODE);
sdk_wifi_station_set_config(&config);
xTaskCreate(&connect_task, "connect_task", 1024, NULL, 2, NULL);
}

View file

@ -9,14 +9,18 @@
#include "esp_spiffs.h"
#include "spiffs.h"
#include "fs-test/fs_test.h"
#include "fs_test.h"
#include "testcase.h"
DEFINE_SOLO_TESTCASE(05_spiffs)
static fs_time_t get_current_time()
{
return timer_get_count(FRC2) / 5000; // to get roughly 1ms resolution
}
void test_task(void *pvParameters)
static void test_task(void *pvParameters)
{
esp_spiffs_init();
esp_spiffs_mount();
@ -28,28 +32,19 @@ void test_task(void *pvParameters)
}
esp_spiffs_mount();
while (1) {
vTaskDelay(5000 / portTICK_PERIOD_MS);
if (fs_load_test_run(100)) {
printf("PASS\n");
} else {
printf("FAIL\n");
}
TEST_ASSERT_TRUE_MESSAGE(fs_load_test_run(100), "Load test failed");
vTaskDelay(5000 / portTICK_PERIOD_MS);
float write_rate, read_rate;
if (fs_speed_test_run(get_current_time, &write_rate, &read_rate)) {
printf("Read speed: %.0f bytes/s\n", read_rate * 1000);
printf("Write speed: %.0f bytes/s\n", write_rate * 1000);
} else {
printf("FAIL\n");
}
float write_rate, read_rate;
if (fs_speed_test_run(get_current_time, &write_rate, &read_rate)) {
printf("Read speed: %.0f bytes/s\n", read_rate * 1000);
printf("Write speed: %.0f bytes/s\n", write_rate * 1000);
} else {
TEST_FAIL();
}
TEST_PASS();
}
void user_init(void)
static void a_05_spiffs(void)
{
uart_set_baud(0, 115200);
xTaskCreate(test_task, "test_task", 1024, NULL, 2, NULL);
}

86
tests/cases/06_timers.c Normal file
View file

@ -0,0 +1,86 @@
#include <stdlib.h>
#include <string.h>
#include "FreeRTOS.h"
#include "task.h"
#include "etstimer.h"
#include "testcase.h"
DEFINE_SOLO_TESTCASE(06_ets_timers);
typedef struct {
ETSTimer handle;
uint32_t start_time;
uint32_t fire_count;
} test_timer_t;
#define TEST_TIMERS_NUMBER 2
static test_timer_t timers[TEST_TIMERS_NUMBER];
static uint32_t get_current_time()
{
return timer_get_count(FRC2) / 5000; // to get roughly 1ms resolution
}
static void timer_0_cb(void *arg)
{
uint32_t v = (uint32_t)arg;
uint32_t delay = get_current_time() - timers[0].start_time;
timers[0].fire_count++;
TEST_ASSERT_EQUAL_UINT32_MESSAGE(0xAA, v, "Timer 0 argument invalid");
TEST_ASSERT_EQUAL_INT_MESSAGE(1, timers[0].fire_count, "Timer 0 repeat error");
printf("Timer 0 delay: %d\n", delay);
// Timer should fire in 100ms
TEST_ASSERT_INT_WITHIN_MESSAGE(5, 100, delay, "Timer 0 time wrong");
}
static void timer_1_cb(void *arg)
{
uint32_t v = (uint32_t)arg;
uint32_t delay = get_current_time() - timers[1].start_time;
timers[1].start_time = get_current_time();
timers[1].fire_count++;
TEST_ASSERT_EQUAL_UINT32_MESSAGE(0xBB, v, "Timer 1 argument invalid");
TEST_ASSERT_TRUE_MESSAGE(timers[1].fire_count < 6,
"Timer 1 repeats after disarming");
printf("Timer 1 delay: %d\n", delay);
// Timer should fire in 100ms
TEST_ASSERT_INT_WITHIN_MESSAGE(5, 50, delay, "Timer 1 time wrong");
if (timers[1].fire_count == 5) {
sdk_ets_timer_disarm(&timers[1].handle);
}
}
static void test_task(void *pvParameters)
{
sdk_ets_timer_disarm(&timers[0].handle);
sdk_ets_timer_setfn(&timers[0].handle, timer_0_cb, (void*)0xAA);
timers[0].start_time = get_current_time();
sdk_ets_timer_arm(&timers[0].handle, 100, false);
sdk_ets_timer_disarm(&timers[1].handle);
sdk_ets_timer_setfn(&timers[1].handle, timer_1_cb, (void*)0xBB);
timers[1].start_time = get_current_time();
sdk_ets_timer_arm(&timers[1].handle, 50, true); // repeating timer
vTaskDelay(500 / portTICK_PERIOD_MS);
TEST_ASSERT_EQUAL_INT_MESSAGE(1, timers[0].fire_count,
"Timer hasn't fired");
TEST_ASSERT_EQUAL_INT_MESSAGE(5, timers[1].fire_count,
"Timer fire count isn't correct");
TEST_PASS();
}
static void a_06_ets_timers(void)
{
xTaskCreate(test_task, "test_task", 256, NULL, 2, NULL);
}

1
tests/fs-test Submodule

@ -0,0 +1 @@
Subproject commit e531bc0d4f75887e5f0e081aae3efbf4a50e2f54

59
tests/include/testcase.h Normal file
View file

@ -0,0 +1,59 @@
#ifndef _TESTCASE_H
#define _TESTCASE_H
#include <stdbool.h>
#include <stdio.h>
#include "esp/uart.h"
/* Unity is the framework with test assertions, etc. */
#include "unity.h"
/* Need to explicitly flag once a test has completed successfully. */
#undef TEST_PASS
#define TEST_PASS() do { UnityConcludeTest(); while(1) { } } while (0)
/* Types of test, defined by hardware requirements */
typedef enum {
SOLO, /* Test require "ESP A" only, no other connections */
DUAL, /* Test requires "ESP A" and "ESP "B", basic interconnections between them */
EYORE_TEST, /* Test requires an eyore-test board with onboard STM32F0 */
} testcase_type_t;
typedef void (testcase_fn_t)(void);
typedef struct {
const char *name;
const char *file;
int line;
testcase_type_t type;
testcase_fn_t *a_fn;
testcase_fn_t *b_fn;
} testcase_t;
void testcase_register(const testcase_t *testcase);
/* Register a test case using these macros. Use DEFINE_SOLO_TESTCASE for single-MCU tests,
and DEFINE_TESTCASE for all other test types.
*/
#define DEFINE_SOLO_TESTCASE(NAME) \
static testcase_fn_t a_##NAME; \
_DEFINE_TESTCASE_COMMON(NAME, SOLO, a_##NAME, 0)
#define DEFINE_TESTCASE(NAME, TYPE) \
static testcase_fn_t a_##NAME; \
static testcase_fn_t b_##NAME; \
_DEFINE_TESTCASE_COMMON(NAME, TYPE, a_##NAME, b_##NAME)
#define _DEFINE_TESTCASE_COMMON(NAME, TYPE, A_FN, B_FN) \
void __attribute__((constructor)) testcase_ctor_##NAME() { \
const testcase_t testcase = { .name = #NAME, \
.file = __FILE__, \
.line = __LINE__, \
.type = TYPE, \
.a_fn = A_FN, \
.b_fn = B_FN, \
}; \
testcase_register(&testcase); \
}
#endif

122
tests/test_main.c Normal file
View file

@ -0,0 +1,122 @@
#include "testcase.h"
#include <stdlib.h>
#include <esp/uart.h>
#include <string.h>
#include <FreeRTOS.h>
#include <task.h>
#include <espressif/esp_common.h>
/* Convert requirement enum to a string we can print */
static const char *get_requirements_name(const testcase_type_t arg) {
switch(arg) {
case SOLO:
return "SOLO";
case DUAL:
return "DUAL";
case EYORE_TEST:
return "EYORE_TEST";
default:
return "UNKNOWN";
}
}
static testcase_t *testcases;
static uint32_t testcases_count;
static uint32_t testcases_alloc;
void testcase_register(const testcase_t *testcase)
{
/* Grow the testcases buffer to fit the new test case,
this buffer will be freed before the test runs.
*/
if(testcases_count == testcases_alloc) {
testcases_alloc += 1;
testcases = realloc(testcases, testcases_alloc * sizeof(testcase_t));
if(!testcases) {
printf("Failed to reallocate test case register length %d\n",
testcases_alloc);
testcases_count = 0;
testcases_alloc = 0;
}
}
memcpy(&testcases[testcases_count++], testcase, sizeof(testcase_t));
}
void user_init(void)
{
uart_set_baud(0, 115200);
sdk_wifi_set_opmode(NULL_MODE);
printf("esp-open-rtos test runner.\n");
printf("%d test cases are defined:\n\n", testcases_count);
for(int i = 0; i < testcases_count; i++) {
printf("CASE %d = %s %s\n", i, testcases[i].name,
get_requirements_name(testcases[i].type));
}
printf("Enter A or B then number of test case to run, ie A0.\n");
int case_idx = -1;
char type;
do {
printf("> ");
uart_rxfifo_wait(0,1);
type = uart_getc(0);
if(type != 'a' && type != 'A' && type != 'b' && type != 'B') {
printf("Type must be A or B.\n");
continue;
}
char idx_buf[6];
for(int c = 0; c < sizeof(idx_buf); c++) {
uart_rxfifo_wait(0,1);
idx_buf[c] = uart_getc(0);
if(idx_buf[c] == ' ') { /* Ignore spaces */
c--;
continue;
}
if(idx_buf[c] == '\r' || idx_buf[c] == '\n') {
idx_buf[c] = 0;
case_idx = atoi(idx_buf);
break;
}
else if(idx_buf[c] < '0' || idx_buf[c] > '9') {
break;
}
}
if(case_idx == -1) {
printf("Invalid case index");
}
else if(case_idx < 0 || case_idx >= testcases_count) {
printf("Test case index out of range.\n");
}
else if((type == 'b' || type =='B') && testcases[case_idx].type == SOLO) {
printf("No ESP type B for 'SOLO' test cases.\n");
} else {
break;
}
} while(1);
if(type =='a')
type = 'A';
else if (type == 'b')
type = 'B';
testcase_t testcase;
memcpy(&testcase, &testcases[case_idx], sizeof(testcase_t));
/* Free the register of test cases now we have the one we're running */
free(testcases);
testcases_alloc = 0;
testcases_count = 0;
printf("\nRunning test case %d (%s %s) as instance %c "
"\nDefinition at %s:%d\n***\n", case_idx,
testcase.name, get_requirements_name(testcase.type), type,
testcase.file, testcase.line);
Unity.CurrentTestName = testcase.name;
Unity.TestFile = testcase.file;
Unity.CurrentTestLineNumber = testcase.line;
Unity.NumberOfTests = 1;
if(type=='A')
testcase.a_fn();
else
testcase.b_fn();
}

389
tests/test_runner.py Executable file
View file

@ -0,0 +1,389 @@
#!/usr/bin/env python3
import sys
import argparse
import subprocess
import os
import serial
import threading
import re
import time
SHORT_OUTPUT_TIMEOUT = 0.25 # timeout for resetting and/or waiting for more lines of output
TESTCASE_TIMEOUT = 60
TESTRUNNER_BANNER = "esp-open-rtos test runner."
RESET_RETRIES = 10 # retries to receive test runner banner after reset
def run(env_a, env_b, cases):
counts = dict((status, 0) for status in TestResult.STATUS_NAMES.keys())
failures = False
for test in cases:
if test.case_type == 'dual':
if env_b is None:
res = TestResult(TestResult.SKIPPED, 'Dual test case skipped')
else:
res = test.run(env_a, env_b)
else:
res = test.run(env_a)
counts[res.status] += 1
failures = failures or res.is_failure()
print("%20s: %d" % ("Total tests", sum(c for c in counts.values())))
print()
# print status counts for tests
for c in sorted(counts.keys()):
print("%20s: %d" % (TestResult.STATUS_NAMES[c], counts[c]))
return failures == 0
def main():
global verbose
args = parse_args()
verbose = args.verbose
if not args.no_flash:
flash_image(args.aport)
if args.type != 'solo':
flash_image(args.bport)
env = TestEnvironment(args.aport, TestEnvironment.A)
env_b = None
cases = env.get_testlist()
if args.type != 'solo':
env_b = TestEnvironment(args.bport, TestEnvironment.B)
cases_b = env_b.get_testlist()
if cases != cases_b:
raise TestRunnerError("Test cases on units A & B don't match")
if args.list: # if list option is specified, do not run test cases
print("List of test cases:")
for test in cases:
print(test)
sys.exit(0)
if args.testcases: # if testcases is specified run only those cases
cases = [c for c in cases if str(c.index) in args.testcases]
sys.exit(0 if run(env, env_b, cases) else 1)
class TestCase(object):
def __init__(self, index, name, case_type):
self.name = name
self.index = index
self.case_type = case_type
def __repr__(self):
return "#%d: %s (%s)" % (self.index, self.name, self.case_type)
def __eq__(self, other):
return (self.index == other.index and
self.name == other.name and
self.case_type == other.case_type)
def run(self, env_a, env_b=None):
"""
Run the test represented by this instance, against the environment(s) passed in.
Returns a TestResult
"""
sys.stdout.write("Running test case '%s'...%s" % (self.name, "\n" if verbose else " "*(40-len(self.name))))
mon_a = env_a.start_testcase(self)
mon_b = env_b.start_testcase(self) if env_b else None
while True:
if mon_a.get_result() and (mon_b is None or mon_b.get_result()):
break # all running test environments have finished
# or, in the case both are running, stop as soon as either environemnt shows a failure
try:
if mon_a.get_result().is_failure():
mon_b.cancel()
break
except AttributeError:
pass
try:
if mon_b.get_result().is_failure():
mon_a.cancel()
break
except AttributeError:
pass
time.sleep(0.1)
if mon_b is not None:
# return whichever result is more severe
res = max(mon_a.get_result(), mon_b.get_result())
else:
res = mon_a.get_result()
if not verbose: # finish the line after the ...
print(TestResult.STATUS_NAMES[res.status])
if res.is_failure():
message = res.message
if "/" in res.message: # cut anything before the file name in the failure
message = message[message.index("/"):]
print("FAILURE MESSAGE:\n%s\n" % message)
return res
class TestResult(object):
""" Class to wrap a test result code and a message """
# Test status flags, higher = more severe
CANCELLED = 0
SKIPPED = 1
PASSED = 2
FAILED = 3
ERROR = 4
STATUS_NAMES = {
CANCELLED: "Cancelled",
SKIPPED: "Skipped",
PASSED: "Passed",
FAILED: "Failed",
ERROR: "Error"
}
def __init__(self, status, message):
self.status = status
self.message = message
def is_failure(self):
return self.status >= TestResult.FAILED
def __qe__(self, other):
if other is None:
return False
else:
return self.status == other.status
def __lt__(self, other):
if other is None:
return False
else:
return self.status < other.status
class TestMonitor(object):
""" Class to monitor a running test case in a separate thread, defer reporting of the result until it's done.
Can poll for completion by calling is_done(), read a TestResult via .get_result()
"""
def __init__(self, port, instance):
super(TestMonitor, self).__init__()
self._thread = threading.Thread(target=self._monitorThread)
self._port = port
self._instance = instance
self._result = None
self._cancelled = False
self.output = ""
self._thread.start()
def cancel(self):
self._cancelled = True
def is_done(self):
return self._result is not None
def get_result(self):
return self._result
def _monitorThread(self):
self.output = ""
start_time = time.time()
self._port.timeout = SHORT_OUTPUT_TIMEOUT
try:
while not self._cancelled and time.time() < start_time + TESTCASE_TIMEOUT:
line = self._port.readline().decode("utf-8", "ignore")
if line == "":
continue # timed out
self.output += "%s+%4.2fs %s" % (self._instance, time.time()-start_time, line)
verbose_print(line.strip())
if line.endswith(":PASS\r\n"):
self._result = TestResult(TestResult.PASSED, "Test passed.")
return
elif ":FAIL:" in line:
self._result = TestResult(TestResult.FAILED, line)
return
elif line == TESTRUNNER_BANNER:
self._result = TestResult(TestResult.ERROR, "Test caused crash and reset.")
return
if not self._cancelled:
self._result = TestResult(TestResult.CANCELLED, "Cancelled")
else:
self._result = TestResult(TestResult.ERROR, "Test timed out")
finally:
self._port.timeout = None
class TestEnvironment(object):
A = "A"
B = "B"
def __init__(self, port, instance):
self._name = port
self._port = TestSerialPort(port, baudrate=115200)
self._instance = instance
def reset(self):
""" Resets the test board, and waits for the test runner program to start up """
for i in range(RESET_RETRIES):
self._port.setDTR(False)
self._port.setRTS(True)
time.sleep(0.05)
self._port.flushInput()
self._port.setRTS(False)
verbose_print("Waiting for test runner startup...")
if self._port.wait_line(lambda line: line == TESTRUNNER_BANNER):
return
else:
verbose_print("Retrying to reset the test board, attempt=%d" %
(i + 1))
continue
raise TestRunnerError("Port %s failed to start test runner" % self._port)
def get_testlist(self):
""" Resets the test board and returns the enumerated list of all supported tests """
self.reset()
tests = []
verbose_print("Enumerating tests...")
def collect_testcases(line):
if line.startswith(">"):
return True # prompt means list of test cases is done, success
m = re.match(r"CASE (\d+) = (.+?) ([A-Z]+)", line)
if m is not None:
t = TestCase(int(m.group(1)), m.group(2), m.group(3).lower())
verbose_print(t)
tests.append(t)
if not self._port.wait_line(collect_testcases):
raise TestRunnerError("Port %s failed to read test list" % self._port)
verbose_print("Port %s found %d test cases" % (self._name, len(tests)))
return tests
def start_testcase(self, case):
""" Starts the specified test instance and returns a TestMonitor reader thread instance
to monitor the output
"""
# synchronously start the test case
self.reset()
if not self._port.wait_line(lambda line: line.startswith(">")):
raise TestRunnerError("Failed to read test runnner prompt")
command = "%s%d\r\n" % (self._instance, case.index)
self._port.write(command.encode("utf-8"))
return TestMonitor(self._port, self._instance)
def get_testdir():
"""
Return the 'tests' directory in the source tree
(assuming the test_runner.py script is in that directory.
"""
res = os.path.dirname(__name__)
return "." if res == "" else res
def flash_image(serial_port):
# Bit hacky: rather than calling esptool directly,
# just use the Makefile flash target with the correct ESPPORT argument
env = dict(os.environ)
env["ESPPORT"] = serial_port
verbose_print("Building and flashing test image to %s..." % serial_port)
try:
stdout = sys.stdout if verbose else None
subprocess.check_call(["make", "flash"], cwd=get_testdir(),
stdout=stdout, stderr=subprocess.STDOUT, env=env)
except subprocess.CalledProcessError as e:
raise TestRunnerError("'make flash EPPORT=%s' failed with exit code %d" %
(serial_port, e.returncode))
verbose_print("Flashing successful.")
def parse_args():
parser = argparse.ArgumentParser(description='esp-open-rtos testrunner', prog='test_runner')
parser.add_argument(
'--type', '-t',
help='Type of test hardware attached to serial ports A & (optionally) B',
choices=['solo', 'dual', 'eyore_test'], default='solo')
parser.add_argument(
'--aport', '-a',
help='Serial port for device A',
default='/dev/ttyUSB0')
parser.add_argument(
'--bport', '-b',
help='Serial port for device B (ignored if type is \'solo\')',
default='/dev/ttyUSB1')
parser.add_argument(
'--no-flash', '-n',
help='Don\'t flash the test binary image before running tests',
action='store_true',
default=False)
parser.add_argument(
'--list', '-l',
help='Display list of available test cases on a device',
action='store_true',
default=False)
parser.add_argument(
'--verbose', '-v',
help='Verbose test runner debugging output',
action='store_true',
default=False)
parser.add_argument('testcases', nargs='*',
help='Optional list of test case numbers to run. '
'By default, all tests are run.')
return parser.parse_args()
class TestRunnerError(RuntimeError):
def __init__(self, message):
RuntimeError.__init__(self, message)
class TestSerialPort(serial.Serial):
def __init__(self, *args, **kwargs):
super(TestSerialPort, self).__init__(*args, **kwargs)
def wait_line(self, callback, timeout=SHORT_OUTPUT_TIMEOUT):
""" Wait for the port to output a particular piece of line content, as judged by callback
Callback called as 'callback(line)' and returns not-True if non-match otherwise can return any value.
Returns first non-False result from the callback, or None if it timed out waiting for a new line.
Note that a serial port spewing legitimate lines of output may block this function forever, if callback
doesn't detect this is happening.
"""
self.timeout = timeout
try:
res = None
while not res:
line = self.readline()
if line == b"":
break # timed out
line = line.decode("utf-8", "ignore").rstrip()
res = callback(line)
return res
finally:
self.timeout = None
verbose = False
def verbose_print(msg):
if verbose:
print(msg)
if __name__ == '__main__':
try:
main()
except TestRunnerError as e:
print(e)
sys.exit(2)

1
tests/unity Submodule

@ -0,0 +1 @@
Subproject commit bbf2fe3a934f96cd00693841247a689e57a17b0d