From c20ec945a1dfff749b9455bb26c29efa758b98b3 Mon Sep 17 00:00:00 2001 From: Tim Blume Date: Tue, 27 Apr 2021 16:23:37 +0200 Subject: [PATCH] more tests --- mitmaddon/bigsnitch.py | 10 +- mitmaddon/networkthread.py | 378 +++++++++++++++--------------- mitmaddon/requirements.txt | 1 + mitmaddon/test_bigsnitch.py | 442 ++++++++++++++++++++++++++++++------ 4 files changed, 567 insertions(+), 264 deletions(-) diff --git a/mitmaddon/bigsnitch.py b/mitmaddon/bigsnitch.py index a24a867..2851a90 100644 --- a/mitmaddon/bigsnitch.py +++ b/mitmaddon/bigsnitch.py @@ -3,18 +3,18 @@ import pdb -from queue import Queue, Empty +from queue import Queue from networkthread import bFlowState, FlowItem, NetworkThread + class BigSnitchBridge: def __init__(self): print("BigSnitchBridge started") self.q = Queue() - #self.thread = NetworkThread("network", self.q) - #self.thread.start() + self.thread = NetworkThread("network", self.q) + self.thread.start() def request(self, flow): - pdb.set_trace() flowitem = FlowItem(bFlowState.UNSENT_HTTP_REQUEST, flow) self.q.put_nowait((flow.id, flowitem)) # intercept until ACK received @@ -27,7 +27,7 @@ class BigSnitchBridge: flow.intercept() def error(self, flow): - flowitem = FlowItem(bFlowState.ERROR, flow, time.monotonic()) + flowitem = FlowItem(bFlowState.ERROR, flow) self.q.put_nowait((flow.id, flowitem)) diff --git a/mitmaddon/networkthread.py b/mitmaddon/networkthread.py index db6f4f4..b8a79c7 100644 --- a/mitmaddon/networkthread.py +++ b/mitmaddon/networkthread.py @@ -1,6 +1,5 @@ import pdb -from mitmproxy import ctx from mitmproxy.flow import Flow import threading import time @@ -9,33 +8,34 @@ import json import os from enum import Enum from dataclasses import dataclass -from typing import List, Dict +from typing import List, Dict, Any from queue import Queue, Empty -""" -# this method is used to convert flow states (generated with get_state()) to json -def convert_to_strings(obj): - if isinstance(obj, dict): - return {convert_to_strings(key): convert_to_strings(value) - for key, value in obj.items()} - elif isinstance(obj, list) or isinstance(obj, tuple): - return [convert_to_strings(element) for element in obj] - elif isinstance(obj, bytes): - try: - data = obj.decode('unicode-escape').encode('latin1').decode('utf-8') - except: - print(obj) - data = str(obj)[2:-1] - return data - return obj -""" +# this method is used to convert flow states (generated with get_state()) to json +def convert_to_strings(obj: Any) -> Any: + if isinstance(obj, dict): + return {convert_to_strings(key): convert_to_strings(value) + for key, value in obj.items()} + elif isinstance(obj, list) or isinstance(obj, tuple): + return [convert_to_strings(element) for element in obj] + elif isinstance(obj, bytes): + try: + data = obj.decode('unicode-escape').encode('latin1').decode('utf-8') + except: + print(obj) + data = str(obj)[2:-1] + return data + + return obj + @dataclass class bHeader: key: str value: str + @dataclass class bRequest: server_ip_address: str @@ -57,6 +57,8 @@ class bRequest: # init from flow dict def __init__(self, flow: dict): + flow = convert_to_strings(flow) + self.server_ip_address = flow["server_conn"]["ip_address"][0] self.tls = flow["server_conn"]["tls_established"] self.content = flow["request"]["content"] @@ -69,8 +71,12 @@ class bRequest: self.timestamp_end = flow["request"]["timestamp_end"] self.headers = [] - for k,v in flow["request"]["headers"]: - self.headers.append(bHeader(k,v)) + for k, v in flow["request"]["headers"]: + self.headers.append(bHeader(str(k), str(v))) + + def json(self) -> dict: + return {} + @dataclass class bResponse: @@ -84,15 +90,21 @@ class bResponse: headers: List[bHeader] def __init__(self, flow: dict): - self.status_code = flow["status_code"] - self.http_version = flow["http_version"] - self.reason = flow["reason"] - self.content = flow["content"] - self.timestamp_start = flow["timestamp_start"] - self.timestamp_end = flow["timestamp_end"] + flow = convert_to_strings(flow) + self.status_code = flow["response"]["status_code"] + self.http_version = flow["response"]["http_version"] + self.reason = flow["response"]["reason"] + self.content = flow["response"]["content"] + self.timestamp_start = flow["response"]["timestamp_start"] + self.timestamp_end = flow["response"]["timestamp_end"] + + self.headers = [] + for k, v in flow["response"]["headers"]: + self.headers.append(bHeader(k, v)) + + def json(self) -> dict: + return {} - for k,v in flow["headers"]: - self.headers.append(bHeader(k,v)) @dataclass class bFlowState(Enum): @@ -102,6 +114,7 @@ class bFlowState(Enum): UNSENT_HTTP_RESPONSE = 3 SENT_HTTP_RESPONSE = 4 + @dataclass class bPacketType: NACK = 0 @@ -113,6 +126,7 @@ class bPacketType: HTTP_REQUEST = 6 HTTP_RESPONSE = 7 + @dataclass class bPacket: ptype: bPacketType @@ -124,6 +138,7 @@ class bPacket: self.flowid = str(json["id"]) self.data = json["data"] + @dataclass class FlowItem: state: bFlowState @@ -131,178 +146,163 @@ class FlowItem: time: float = 0 retries_left: int = 5 + """ The network thread communicates with the bigsnitch plugin using zeromq. """ class NetworkThread(threading.Thread): - def __init__(self, name: str, queue: Queue, path: str = None): - threading.Thread.__init__(self) - self.name = name - # path - self.path = path - if not self.path: - self.path = os.environ.get("BIGSNITCH_PATH", None) - if not self.path: - self.path = "tcp://127.0.0.1:12345" + def __init__(self, name: str, queue: Queue, path: str = None): + threading.Thread.__init__(self) + self.name = name + # path + self.path = path + if not self.path: + self.path = os.environ.get("BIGSNITCH_PATH", None) + if not self.path: + self.path = "tcp://127.0.0.1:12345" - # queue for communicating with the main mitmproxy thread - # contains tuples of (id, FlowItem) - self.q = queue - # for zmq use - self.context = zmq.Context() - # all current flows being handled by mitmproxy - self.flows: Dict[FlowItem] = {} - # timer for sending pings to check if the connection broke - self.timer = time.monotonic() - # retries left for reconnecting - self.retries = 5 + # queue for communicating with the main mitmproxy thread + # contains tuples of (id, FlowItem) + self.q = queue + # for zmq use + self.context = zmq.Context() + # all current self.flows being handled by mitmproxy + self.flows: Dict[FlowItem] = {} + # timer for sending pings to check if the connection broke + self.timer = time.monotonic() + # retries left for reconnecting + self.retries = 5 - # send a single message, no checks involved - def send(self, msg): - a = convert_to_strings(msg) - self.socket.send(str.encode(json.dumps(a))) + # send a single message, no checks involved + def send(self, msg): + #a = convert_to_strings(msg) + self.socket.send(str.encode(json.dumps(msg))) - # add new flows from the queue - def get_new_flows(self): - while True: - try: - # get new flows that may occured - i, flowitem = self.q.get(block=False) - if self.flows.get(i, None): - print(f"flow {i} doubled? ignoring...") + # add new self.flows from the queue + def get_new_flows(self): + while True: + try: + # get new self.flows that may occured + i, flowitem = self.q.get(block=False) + if self.flows.get(i, None): + print(f"flow {i} doubled? ignoring...") + continue + else: + self.flows[i] = flowitem + + except Empty: + break + + def send_packet(self, pkg: bPacket): + msg = {"type": pkg.ptype, "id": pkg.flowid, "data": pkg.data} + self.send(msg) + + def send_http_request(self, id: int, request: bRequest): + pkg = bPacket(bPacketType.HTTP_REQUEST, request.json()) + self.send_packet(pkg) + self.flows[id].state = bFlowState.SENT_HTTP_REQUEST + self.flows[id].time = time.monotonic() + + def send_http_response(self, id: int, response: bResponse): + pkg = bPacket(bPacketType.HTTP_RESPONSE, response.json()) + self.send_packet(pkg) + self.flows[id].state = bFlowState.SENT_HTTP_RESPONSE + self.flows[id].time = time.monotonic() + + # update all current self.flows + # handles the state machine for each flow + def update_flows(self): + for id, flow in self.flows.items(): + if self.flows[id].retries <= 0: + self.flows[id].flow.kill() + print(f"http flow {id} timed out! flow killed.") + + delta = time.monotonic() - self.flows[id].time + + if flow.state == bFlowState.UNSENT_HTTP_REQUEST or \ + flow.state == bFlowState.SENT_HTTP_REQUEST and delta > 5: + self.send_http_request(id, bRequest(flow.flow)) + self.flows[id].retries -= 1 + + if flow.state == bFlowState.UNSENT_HTTP_RESPONSE or \ + flow.state == bFlowState.SENT_HTTP_RESPONSE and delta > 5: + self.send_http_response(id, bResponse(flow.flow)) + self.flows[id].retries -= 1 + + elif flow.state == bFlowState.ERROR: + print(f"error in flow {id}!") + + # handle incoming packets / update the statemachine + def handle_packets(self): + while((self.socket.poll(50) & zmq.POLLIN) != 0): + msg = self.socket.recv() + try: + if msg: + result = json.loads(str(msg)) + pkg = bPacket(json=result) + # flow ACKed + if pkg.ptype == bPacketType.ACK: + continue + # flow killed + elif pkg.ptype == bPacketType.KILL: continue else: - self.flows.append(flowitem) + print(f"got unexpected message {pkg.ptype}") + except json.JSONDecodeError: + print(f"malformed message received {msg}") - except Empty: - break + def run(self): + print("thread started") + self.connect() + while True: + self.timer = time.monotonic() + self.get_new_flows() + self.handle_packets() + self.update_flows() - def send_packet(self, pkg: bPacket): - msg = {"type": pkg.ptype, "id": pkg.flowid, "data": pkg.data} - self.send(msg) - - def send_http_request(self, id: int, request: bRequest): - pkg = bPacket(bPacketType.HTTP_REQUEST, id, request.json()) - self.send_packet(pkg) - flows[id].state = bFlowState.SENT_HTTP_REQUEST - flows[id].time = time.monotonic() - - def send_http_response(self, id: int, response: bResponse): - pkg = bPacket(bPacketType.HTTP_RESPONSE, id, response.json()) - self.send_packet(pkg) - flows[id].state = bFlowState.SENT_HTTP_RESPONSE - flows[id].time = time.monotonic() - - # update all current flows - # handles the state machine for each flow - def update_flows(self): - for id, flow in self.flows.items(): - # send the request if not sent - if state == bFlowState.UNSENT_HTTP_REQUEST: - self.send_request(id, bRequest(flow.flow)) - - elif state == bFlowState.SENT_HTTP_REQUEST: - # check timer, try resend - delta = time.monotonic() - flows[id].time - if delta > 5: - self.send_request(id, bRequest(flow.flow)) - flows[id].retries -= 1 - continue - - if flows[id].retries <= 0: - flows[id].flow.kill() - print(f"http request {id} timed out! flow killed.") - - # send the response if not sent - elif state == bFlowState.UNSENT_HTTP_RESPONSE: - self.send_response(id, bResponse(flow.flow)) - - elif state == bFlowState.SENT_HTTP_RESPONSE: - # check timer, try resend - delta = time.monotonic() - flows[id].time - if delta > 5: - self.send_response(id, bResponse(flow.flow)) - flows[id].retries -= 1 - continue - - if flows[id].retries <= 0: - flows[id].flow.kill() - print(f"http response {id} timedout! flow killed.") - - elif state == bFlowState.ERROR: - print(f"error in flow {id}!") - - # handle incoming packets / update the statemachine - def handle_packets(self): - while((self.socket.poll(50) & zmq.POLLIN) != 0): - msg = self.socket.recv() - try: - if msg: - result = json.loads(msg) - pkg = bPacket(json=result) - # flow ACKed - if pkg.ptype == bPacketType.ACK: - continue - # flow killed - elif pkg.ptype == bPacketType.KILL: - continue - else: - print(f"got unexpected message {pkg.ptype}") - except json.JSONDecodeError: - print(f"malformed message received {msg}") - - def run(self): - print("thread started") - self.connect() - while True: - self.timer = time.monotonic() - self.get_new_flows() - self.handle_packets() - self.update_flows() - - if self.timer - time.monotonic() < -5: - pass - #self.send_msg_and_ack({"msg": "ping"}) - - def disconnect(self): - self.socket.setsockopt(zmq.LINGER,0) - self.socket.close() - print("disconnected") - - def reconnect(self): - print("reconnecting") - self.disconnect() - time.sleep(1) - self.connect() - - def connect(self): - self.socket = self.context.socket(zmq.PAIR) - self.socket.connect(self.path) + if self.timer - time.monotonic() < -5: + pass #self.send_msg_and_ack({"msg": "ping"}) - print("connected") - """ - def send_msg_and_ack(self, msg): - self.timer = time.monotonic() - while True: - #print("m sending") - self.send(msg) - if (self.socket.poll(50) & zmq.POLLIN) != 0: - msg = self.socket.recv() - try: - if msg: - result = json.loads(msg) - if result["msg"] == "ack": - print("m ack received") - return result - else: - print("got unexpected message {result}") - - except json.JSONDecodeError: - print(f"malformed message received {msg}") - print("no ack received, reconnecting...") - self.reconnect() - return NO_MSG - """ \ No newline at end of file + def disconnect(self): + self.socket.setsockopt(zmq.LINGER,0) + self.socket.close() + print("disconnected") + + def reconnect(self): + print("reconnecting") + self.disconnect() + time.sleep(1) + self.connect() + + def connect(self): + self.socket = self.context.socket(zmq.PAIR) + self.socket.connect(self.path) + #self.send_msg_and_ack({"msg": "ping"}) + print("connected") + + """ + def send_msg_and_ack(self, msg): + self.timer = time.monotonic() + while True: + #print("m sending") + self.send(msg) + if (self.socket.poll(50) & zmq.POLLIN) != 0: + msg = self.socket.recv() + try: + if msg: + result = json.loads(msg) + if result["msg"] == "ack": + print("m ack received") + return result + else: + print("got unexpected message {result}") + + except json.JSONDecodeError: + print(f"malformed message received {msg}") + print("no ack received, reconnecting...") + self.reconnect() + return NO_MSG + """ \ No newline at end of file diff --git a/mitmaddon/requirements.txt b/mitmaddon/requirements.txt index be2d875..810f798 100644 --- a/mitmaddon/requirements.txt +++ b/mitmaddon/requirements.txt @@ -3,3 +3,4 @@ mitmdump pyzmq deepdiff pytest +tox diff --git a/mitmaddon/test_bigsnitch.py b/mitmaddon/test_bigsnitch.py index 0675add..9083fde 100644 --- a/mitmaddon/test_bigsnitch.py +++ b/mitmaddon/test_bigsnitch.py @@ -1,9 +1,10 @@ #!/usr/bin/env python3 import pdb +import queue import pytest -from networkthread import bPacket, bRequest, bResponse, bHeader, NetworkThread +from networkthread import bPacket, bRequest, bResponse, bHeader, NetworkThread, FlowItem, bFlowState import os import tempfile from queue import Queue @@ -11,60 +12,192 @@ import zmq from deepdiff import DeepDiff + # usual flow state of the request with some big parts removed @pytest.fixture def flowstate_request(): return {'client_conn': {'address': ('::ffff:127.0.0.1', 60630, 0, 0), - 'alpn_proto_negotiated': b'http/1.1', - 'cipher_name': 'TLS_AES_256_GCM_SHA384', - 'clientcert': None, - 'id': '5dde7ef8-9b1a-4b60-9d15-d308442a27ea', - 'mitmcert': '', - 'sni': 'yolo.jetzt', - 'timestamp_end': None, - 'timestamp_start': 1619390481.8003347, - 'timestamp_tls_setup': 1619390482.6879823, - 'tls_established': True, - 'tls_extensions': [], - 'tls_version': 'TLSv1.3'}, - 'error': None, - 'id': '51215b69-c76f-4ac2-afcb-da3b823d9f88', - 'intercepted': False, - 'is_replay': None, - 'marked': False, - 'metadata': {}, - 'mode': 'transparent', - 'request': {'authority': b'', - 'content': b'', - 'headers': ((b'Host', b'yolo.jetzt'), - (b'User-Agent', b'curl/7.75.0'), - (b'Accept', b'*/*')), - 'host': 'yolo.jetzt', - 'http_version': b'HTTP/1.1', - 'method': b'GET', - 'path': b'/', - 'port': 443, - 'scheme': b'https', - 'timestamp_end': 1619390482.69, - 'timestamp_start': 1619390482.6886377, - 'trailers': None}, - 'response': None, - 'server_conn': {'address': ('yolo.jetzt', 443), - 'alpn_proto_negotiated': b'http/1.1', - 'cert': '', - 'id': 'ecc4cd3b-7e35-4815-b618-5931fe64729b', - 'ip_address': ('95.156.226.69', 443), - 'sni': 'yolo.jetzt', - 'source_address': ('192.168.42.182', 51514), - 'timestamp_end': None, - 'timestamp_start': 1619390481.8154442, - 'timestamp_tcp_setup': 1619390481.994565, - 'timestamp_tls_setup': 1619390482.6819758, - 'tls_established': True, - 'tls_version': 'TLSv1.2', - 'via': None}, - 'type': 'http', - 'version': 9} + 'alpn_proto_negotiated': b'http/1.1', + 'cipher_name': 'TLS_AES_256_GCM_SHA384', + 'clientcert': None, + 'id': '5dde7ef8-9b1a-4b60-9d15-d308442a27ea', + 'mitmcert': '', + 'sni': 'yolo.jetzt', + 'timestamp_end': None, + 'timestamp_start': 1619390481.8003347, + 'timestamp_tls_setup': 1619390482.6879823, + 'tls_established': True, + 'tls_extensions': [], + 'tls_version': 'TLSv1.3'}, + 'error': None, + 'id': '51215b69-c76f-4ac2-afcb-da3b823d9f88', + 'intercepted': False, + 'is_replay': None, + 'marked': False, + 'metadata': {}, + 'mode': 'transparent', + 'request': {'authority': b'', + 'content': b'', + 'headers': ((b'Host', b'yolo.jetzt'), + (b'User-Agent', b'curl/7.75.0'), + (b'Accept', b'*/*')), + 'host': 'yolo.jetzt', + 'http_version': b'HTTP/1.1', + 'method': b'GET', + 'path': b'/', + 'port': 443, + 'scheme': b'https', + 'timestamp_end': 1619390482.69, + 'timestamp_start': 1619390482.6886377, + 'trailers': None}, + 'response': None, + 'server_conn': {'address': ('yolo.jetzt', 443), + 'alpn_proto_negotiated': b'http/1.1', + 'cert': '', + 'id': 'ecc4cd3b-7e35-4815-b618-5931fe64729b', + 'ip_address': ('95.156.226.69', 443), + 'sni': 'yolo.jetzt', + 'source_address': ('192.168.42.182', 51514), + 'timestamp_end': None, + 'timestamp_start': 1619390481.8154442, + 'timestamp_tcp_setup': 1619390481.994565, + 'timestamp_tls_setup': 1619390482.6819758, + 'tls_established': True, + 'tls_version': 'TLSv1.2', + 'via': None}, + 'type': 'http', + 'version': 9} + + +@pytest.fixture() +def flowstate_response(): + return {'client_conn': {'address': ('::ffff:127.0.0.1', 30190, 0, 0), + 'alpn_proto_negotiated': b'http/1.1', + 'cipher_name': 'TLS_AES_256_GCM_SHA384', + 'clientcert': None, + 'id': '2507e6ce-3132-4394-9432-f55fb5f55b05', + 'mitmcert': '', + 'sni': 'yolo.jetzt', + 'timestamp_end': None, + 'timestamp_start': 1619461916.6160116, + 'timestamp_tls_setup': 1619461916.7581937, + 'tls_established': True, + 'tls_extensions': [], + 'tls_version': 'TLSv1.3'}, + 'error': None, + 'id': '449d1a87-744f-4a18-9a5d-f085f99a5c62', + 'intercepted': True, + 'is_replay': None, + 'marked': False, + 'metadata': {}, + 'mode': 'transparent', + 'request': {'authority': b'', + 'content': b'', + 'headers': ((b'Host', b'yolo.jetzt'), + (b'User-Agent', b'curl/7.75.0'), + (b'Accept', b'*/*')), + 'host': 'yolo.jetzt', + 'http_version': b'HTTP/1.1', + 'method': b'GET', + 'path': b'/', + 'port': 443, + 'scheme': b'https', + 'timestamp_end': 1619461916.7603076, + 'timestamp_start': 1619461916.7588415, + 'trailers': None}, + 'response': {'content': b'\n\n \n\ntodays yolo - 3026 \n\n' + '\n' + '\n' + '
\n' + '
\n' + 'the yolo for today is
\n' + '3026
\n' + '
\n' + '
\n' + '
\n' + '\tCat\n' + '\t
\n' + '\tRegulation (EU) 2016/679 compliant\n' + '
\n' + '\n' + '\n' + '\n', + 'headers': [bHeader(key='Server', value='nginx'), + bHeader(key='Date', value='Mon, 26 Apr 2021 18:31:56 GMT'), + bHeader(key='Content-Type', value='text/html'), + bHeader(key='Content-Length', value='2460'), + bHeader(key='Last-Modified', value='Sun, 25 Apr 2021 22:00:00 GMT'), + bHeader(key='Connection', value='keep-alive'), + bHeader(key='ETag', value='"6085e660-99c"'), + bHeader(key='Strict-Transport-Security', value='max-age=31536000; includeSubDomains; preload'), + bHeader(key='X-Xss-Protection', value='1; mode=block'), + bHeader(key='X-Content-Type-Options', value='nosniff'), + bHeader(key='Content-Security-Policy', value="default-src 'self'; script-src 'self' 'unsafe-inline'; connect-src 'self'; img-src 'self'; style-src 'self' 'unsafe-inline';"), + bHeader(key='X-Frame-Options', value='SAMEORIGIN'), + bHeader(key='Referrer-Policy', value='no-referrer'), + bHeader(key='Accept-Ranges', value='bytes')], + 'http_version': 'HTTP/1.1', + 'reason': 'OK', + 'status_code': 200, + 'timestamp_end': 1619461916.7979567, + 'timestamp_start': 1619461916.7935555} + + assert not DeepDiff(res.__dict__, d) + + class TestMitmAddon: + def test_get_new_flows_empty(self, client_server): + queue, client, server = client_server + # queue empty, flows empty + assert queue.empty() + assert not len(client.flows) + + client.get_new_flows() + + # afterwards too + assert queue.empty() + assert not len(client.flows) + + def test_get_new_flows_single(self, client_server): + queue, client, server = client_server + def test_request(self, client_server): - self.client, self.server = client_server + queue, client, server = client_server # create request - flowitem = FlowItem(bFlowState.UNSENT_HTTP_REQUEST, flow) - self.q.put_nowait((flow.id, flowitem)) -""" \ No newline at end of file + #flowitem = FlowItem(bFlowState.UNSENT_HTTP_REQUEST, flowstate_request) + #self.q.put_nowait(('51215b69-c76f-4ac2-afcb-da3b823d9f88', flowitem)) \ No newline at end of file