improve testing

This commit is contained in:
Tim Blume 2021-04-26 01:24:08 +02:00
parent 0214db88c3
commit 75a5ba79db
5 changed files with 94 additions and 37 deletions

View file

@ -10,10 +10,11 @@ class BigSnitchBridge:
def __init__(self): def __init__(self):
print("BigSnitchBridge started") print("BigSnitchBridge started")
self.q = Queue() self.q = Queue()
self.thread = NetworkThread("network", self.q) #self.thread = NetworkThread("network", self.q)
self.thread.start() #self.thread.start()
def request(self, flow): def request(self, flow):
pdb.set_trace()
flowitem = FlowItem(bFlowState.UNSENT_HTTP_REQUEST, flow) flowitem = FlowItem(bFlowState.UNSENT_HTTP_REQUEST, flow)
self.q.put_nowait((flow.id, flowitem)) self.q.put_nowait((flow.id, flowitem))
# intercept until ACK received # intercept until ACK received

View file

@ -36,7 +36,6 @@ class bHeader:
key: str key: str
value: str value: str
@dataclass @dataclass
class bRequest: class bRequest:
server_ip_address: str server_ip_address: str
@ -56,23 +55,23 @@ class bRequest:
error: str error: str
# init from dict # init from flow dict
def __init__(self, flow: dict): def __init__(self, flow: dict):
self.server_ip_address = flow["server_ip_address"] self.server_ip_address = flow["server_conn"]["ip_address"][0]
self.tls = flow["server_conn"]["tls_established"] self.tls = flow["server_conn"]["tls_established"]
self.content = flow["content"] self.content = flow["request"]["content"]
self.scheme = flow["scheme"] self.scheme = flow["request"]["scheme"]
self.method = flow["method"] self.method = flow["request"]["method"]
self.host = flow["host"] self.host = flow["request"]["host"]
self.port = flow["port"] self.port = flow["request"]["port"]
self.http_version = flow["http_version"] self.http_version = flow["request"]["http_version"]
self.timestamp_start = flow["timestamp_start"] self.timestamp_start = flow["request"]["timestamp_start"]
self.timestamp_end = flow["timestamp_end"] self.timestamp_end = flow["request"]["timestamp_end"]
for k,v in flow["headers"]: self.headers = []
for k,v in flow["request"]["headers"]:
self.headers.append(bHeader(k,v)) self.headers.append(bHeader(k,v))
@dataclass @dataclass
class bResponse: class bResponse:
status_code: int status_code: int
@ -95,19 +94,6 @@ class bResponse:
for k,v in flow["headers"]: for k,v in flow["headers"]:
self.headers.append(bHeader(k,v)) self.headers.append(bHeader(k,v))
@dataclass
class bFlow:
uid: str
request: bRequest
response: bResponse
def __init__(self, flow: dict):
self.uid = flow["id"]
self.request = bRequest(flow["request"])
self.response = bResponse(flow["response"])
@dataclass @dataclass
class bFlowState(Enum): class bFlowState(Enum):
ERROR = 0 ERROR = 0
@ -116,7 +102,6 @@ class bFlowState(Enum):
UNSENT_HTTP_RESPONSE = 3 UNSENT_HTTP_RESPONSE = 3
SENT_HTTP_RESPONSE = 4 SENT_HTTP_RESPONSE = 4
@dataclass @dataclass
class bPacketType: class bPacketType:
NACK = 0 NACK = 0
@ -128,19 +113,17 @@ class bPacketType:
HTTP_REQUEST = 6 HTTP_REQUEST = 6
HTTP_RESPONSE = 7 HTTP_RESPONSE = 7
@dataclass @dataclass
class bPacket: class bPacket:
ptype: bPacketType ptype: bPacketType
flowid: int flowid: str
data: str data: str
def __init__(self, json: Dict): def __init__(self, json: Dict):
self.ptype = json["type"] self.ptype = json["type"]
self.flowid = int(json["id"]) self.flowid = str(json["id"])
self.data = json["data"] self.data = json["data"]
@dataclass @dataclass
class FlowItem: class FlowItem:
state: bFlowState state: bFlowState

View file

@ -1,3 +1,5 @@
mitmproxy mitmproxy
mitmdump mitmdump
pyzmq pyzmq
deepdiff
pytest

View file

@ -3,12 +3,69 @@
import pdb import pdb
import pytest import pytest
from networkthread import NetworkThread from networkthread import bPacket, bRequest, bResponse, bHeader, NetworkThread
import os import os
import tempfile import tempfile
from queue import Queue from queue import Queue
import zmq import zmq
from deepdiff import DeepDiff
# usual flow state of the request with some big parts removed
@pytest.fixture
def flowstate_request():
return {'client_conn': {'address': ('::ffff:127.0.0.1', 60630, 0, 0),
'alpn_proto_negotiated': b'http/1.1',
'cipher_name': 'TLS_AES_256_GCM_SHA384',
'clientcert': None,
'id': '5dde7ef8-9b1a-4b60-9d15-d308442a27ea',
'mitmcert': '',
'sni': 'yolo.jetzt',
'timestamp_end': None,
'timestamp_start': 1619390481.8003347,
'timestamp_tls_setup': 1619390482.6879823,
'tls_established': True,
'tls_extensions': [],
'tls_version': 'TLSv1.3'},
'error': None,
'id': '51215b69-c76f-4ac2-afcb-da3b823d9f88',
'intercepted': False,
'is_replay': None,
'marked': False,
'metadata': {},
'mode': 'transparent',
'request': {'authority': b'',
'content': b'',
'headers': ((b'Host', b'yolo.jetzt'),
(b'User-Agent', b'curl/7.75.0'),
(b'Accept', b'*/*')),
'host': 'yolo.jetzt',
'http_version': b'HTTP/1.1',
'method': b'GET',
'path': b'/',
'port': 443,
'scheme': b'https',
'timestamp_end': 1619390482.69,
'timestamp_start': 1619390482.6886377,
'trailers': None},
'response': None,
'server_conn': {'address': ('yolo.jetzt', 443),
'alpn_proto_negotiated': b'http/1.1',
'cert': '',
'id': 'ecc4cd3b-7e35-4815-b618-5931fe64729b',
'ip_address': ('95.156.226.69', 443),
'sni': 'yolo.jetzt',
'source_address': ('192.168.42.182', 51514),
'timestamp_end': None,
'timestamp_start': 1619390481.8154442,
'timestamp_tcp_setup': 1619390481.994565,
'timestamp_tls_setup': 1619390482.6819758,
'tls_established': True,
'tls_version': 'TLSv1.2',
'via': None},
'type': 'http',
'version': 9}
class MitmAddonTestServer: class MitmAddonTestServer:
def __init__(self, queue, path: str): def __init__(self, queue, path: str):
self.queue = queue self.queue = queue
@ -44,8 +101,22 @@ def client_server():
server.disconnect() server.disconnect()
class TestBigSnitchWrapper: class TestBigSnitchWrapper:
def test_request_convert(self): def test_request_convert(self, flowstate_request):
pass req = bRequest(flow=flowstate_request)
d = {'content': b'',
'headers': [bHeader(key=b'Host', value=b'yolo.jetzt'),
bHeader(key=b'User-Agent', value=b'curl/7.75.0'),
bHeader(key=b'Accept', value=b'*/*')],
'host': 'yolo.jetzt',
'http_version': b'HTTP/1.1',
'method': b'GET',
'port': 443,
'scheme': b'https',
'server_ip_address': '95.156.226.69',
'timestamp_end': 1619390482.69,
'timestamp_start': 1619390482.6886377,
'tls': True}
assert not DeepDiff(req.__dict__, d)
""" """
class TestMitmAddon: class TestMitmAddon:
def test_request(self, client_server): def test_request(self, client_server):

View file

@ -12,4 +12,4 @@ deps = -r{toxinidir}/requirements.txt
setenv = setenv =
PYTHONDONTWRITEBYTECODE=1 PYTHONDONTWRITEBYTECODE=1
commands = commands =
{envpython} sh ../test.sh pytest