bigsnitch/mitmaddon/test_bigsnitch.py
2021-04-28 19:20:07 +02:00

665 lines
No EOL
30 KiB
Python

#!/usr/bin/env python3
import pdb
import queue
import uuid
import pytest
from mitmproxy.http import HTTPFlow, HTTPRequest, HTTPResponse
from mitmproxy.connections import ClientConnection, ServerConnection
from networkthread import bPacket, bRequest, bResponse, bHeader, NetworkThread, FlowItem, bFlowState
import os
import tempfile
from queue import Queue
import zmq
from deepdiff import DeepDiff
@pytest.fixture
def flowstate_clientconn():
return dict(
id=str(uuid.uuid4()),
address=("address", 22),
source_address=("address", 22),
ip_address=("192.168.0.1", 22),
timestamp_start=946681202,
timestamp_tcp_setup=946681203,
timestamp_tls_setup=946681204,
timestamp_end=946681205,
tls_established=True,
sni="address",
alpn=None,
tls_version="TLSv1.2",
via=None,
state=0,
error=None,
tls=False,
certificate_list=[],
alpn_offers=[],
cipher_name=None,
cipher_list=[],
via2=None,
)
@pytest.fixture
def flowstate_serverconn():
return dict(
id=str(uuid.uuid4()),
address=("address", 22),
source_address=("address", 22),
ip_address=("192.168.0.1", 22),
timestamp_start=946681202,
timestamp_tcp_setup=946681203,
timestamp_tls_setup=946681204,
timestamp_end=946681205,
tls_established=True,
sni="address",
alpn=None,
tls_version="TLSv1.2",
via=None,
state=0,
error=None,
tls=False,
certificate_list=[],
alpn_offers=[],
cipher_name=None,
cipher_list=[],
via2=None,
)
# usual flow state of the request with some big parts removed
@pytest.fixture
def flowstate_request(flowstate_clientconn, flowstate_serverconn):
return {'client_conn': flowstate_clientconn,
'error': None,
'id': '51215b69-c76f-4ac2-afcb-da3b823d9f88',
'intercepted': False,
'is_replay': None,
'marked': False,
'metadata': {},
'mode': 'transparent',
'request': {'content': b'',
'headers': ((b'Host', b'yolo.jetzt'),
(b'User-Agent', b'curl/7.75.0'),
(b'Accept', b'*/*')),
'host': 'yolo.jetzt',
'http_version': b'HTTP/1.1',
'method': b'GET',
'path': b'/',
'port': 443,
'scheme': b'https',
'timestamp_end': 1619390482.69,
'timestamp_start': 1619390482.6886377,
'authority': '',
'trailers': '',
},
'response': None,
"server_conn": flowstate_serverconn,
'type': 'http',
'version': 9}
@pytest.fixture
def flow_request(flowstate_request):
c = ClientConnection.make_dummy("192.168.0.2")
s = ServerConnection.make_dummy("192.168.0.1")
flow = HTTPFlow(c, s)
flow.request = HTTPRequest.from_state(flowstate_request["request"])
return flow
@pytest.fixture()
def flowstate_response(flowstate_clientconn, flowstate_serverconn):
return {
"client_conn": flowstate_clientconn,
'error': None,
'id': '449d1a87-744f-4a18-9a5d-f085f99a5c62',
'intercepted': True,
'is_replay': None,
'marked': False,
'metadata': {},
'mode': 'transparent',
'request': {'authority': b'',
'content': b'',
'headers': ((b'Host', b'yolo.jetzt'),
(b'User-Agent', b'curl/7.75.0'),
(b'Accept', b'*/*')),
'host': 'yolo.jetzt',
'http_version': b'HTTP/1.1',
'method': b'GET',
'path': b'/',
'port': 443,
'scheme': b'https',
'timestamp_end': 1619461916.7603076,
'timestamp_start': 1619461916.7588415,
'trailers': None},
'response': {'content': b'<!DOCTYPE html">\n<html>\n<head> \n<meta http-equi'
b'v="content-type" content="text/html;charset=ISO-8859'
b'-1">\n<title>todays yolo - 3026</title> \n<style '
b'type="text/css" media="screen">\n<!--\nbody \n{\ncol'
b'or: white;\nbackground-color: #003;\nmargin: 0px\n}'
b'\n\n.yolo\n{\nfont-family: Verdana, Geneva, Arial, s'
b'ans-serif;\nfont-weight: bold;\nfont-size: 3em;\n}\n'
b'\na:visited {\n\tcolor: blue\n}\n\n#content '
b' \n{\nfont-family: Verdana, Geneva, Arial, sans-se'
b'rif;\ncolor: white;\nbackground-color: transparent'
b';\ntext-align: center;\nfont-size: 2em;\nposition: '
b'absolute;\ntop: 50%;\nleft: 50%;\n-webkit-transform'
b': translateX(-50%) translateY(-50%);\n-moz-transf'
b'orm: translateX(-50%) translateY(-50%);\n-ms-tran'
b'sform: translateX(-50%) translateY(-50%);\ntransf'
b'orm: translateX(-50%) translateY(-50%);\noverflow'
b': visible;\nvisibility: visible;\ndisplay: block\n}'
b'\n\n#sponsors \n{\nfont-family: Verdana, Gene'
b'va, Arial, sans-serif;\ncolor: white;\nbackground-'
b'color: transparent;\ntext-align: center;\nfont-siz'
b'e: 1em;\nposition: absolute;\ntop: 90%;\nleft: 50%;'
b'\n-webkit-transform: translateX(-50%) translateY('
b'-50%);\n-moz-transform: translateX(-50%) translat'
b'eY(-50%);\n-ms-transform: translateX(-50%) transl'
b'ateY(-50%);\ntransform: translateX(-50%) translat'
b'eY(-50%);\noverflow: visible;\nvisibility: visible'
b';\ndisplay: block\n}\n\n#progress\n{\n\tbackground-'
b'color: #f08;\n\tposition: absolute;\n\tleft: 0px'
b';\n\ttop: 0px;\n\theight: 10px;\n}\n\n-->\n</sty'
b'le>\n</head>\n<body>\n<div id="progress" class="pro'
b'gressBarDiv"></div>\n<div id="content">\nthe yolo '
b'for today is<br>\n<span class="yolo" id="yoloValu'
b'e" title="YOLOVALUE">3026</span><br>\n<br>\n</div>'
b'\n<div id="sponsors">\n\t<a href="https://cat.yolo.'
b'jetzt">Cat</a>\n\t<br>\n\tRegulation (EU) 2016/679 c'
b'ompliant\n</div>\n<script type="text/javascript">\n'
b'var currentYoloTS = 1619388000000; //Multiply by 100'
b'0 the YOLO way\nvar tDiff = Date.now() - currentY'
b'oloTS;\n\nvar dayMS = 24 * 60 * 60 * 1000;\n\nconsol'
b'e.debug(currentYoloTS);\nconsole.debug(Date.now()'
b' - currentYoloTS);\nfunction checkTimestamps()\n{\n'
b'\ttDiff = Date.now() - currentYoloTS;\n\tupdateUi()'
b';\n\tif(tDiff > dayMS)\n\t{\n\t\tclearInterval(upda'
b'teInterval);\n\t\tlocation.reload();\t\n\t}\n}\n'
b'\nfunction updateUi()\n{\n\tvalPercent = tDiff / day'
b'MS * 100;\n\tif(valPercent > 100) valPercent = 100'
b';\n\tdocument.getElementById("progress").style = "'
b'width: " + valPercent + "%;";\n\tif(tDiff > dayMS)'
b'\n\t{\n\t\tdocument.getElementById("yoloValue").i'
b'nnerHTML = "&#xfffd;";\n\t}\n}\n\nupdateUi();\nvar'
b' updateInterval = setInterval(checkTimestamps, 5000)'
b';\n\n//YOLODATE\n</script>\n</body>\n</html>\n',
'headers': ((b'Server', b'nginx'),
(b'Date', b'Mon, 26 Apr 2021 18:31:56 GMT'),
(b'Content-Type', b'text/html'),
(b'Content-Length', b'2460'),
(b'Last-Modified', b'Sun, 25 Apr 2021 22:00:00 GMT'),
(b'Connection', b'keep-alive'),
(b'ETag', b'"6085e660-99c"'),
(b'Strict-Transport-Security',
b'max-age=31536000; includeSubDomains; preload'),
(b'X-Xss-Protection', b'1; mode=block'),
(b'X-Content-Type-Options', b'nosniff'),
(b'Content-Security-Policy',
b"default-src 'self'; script-src 'self' 'unsafe-in"
b"line'; connect-src 'self'; img-src 'self'; style"
b"-src 'self' 'unsafe-inline';"),
(b'X-Frame-Options', b'SAMEORIGIN'),
(b'Referrer-Policy', b'no-referrer'),
(b'Accept-Ranges', b'bytes')),
'http_version': b'HTTP/1.1',
'reason': b'OK',
'status_code': 200,
'timestamp_end': 1619461916.7979567,
'timestamp_start': 1619461916.7935555,
'trailers': None},
'server_conn': flowstate_serverconn,
'type': 'http',
'version': 9}
@pytest.fixture
def flow_response(flowstate_request, flowstate_response):
c = ClientConnection.make_dummy("192.168.0.2")
s = ServerConnection.make_dummy("192.168.0.1")
flow = HTTPFlow(c, s)
flow.request = HTTPRequest.from_state(flowstate_request["request"])
flow.response = HTTPResponse.from_state(flowstate_response["response"])
return flow
class MitmAddonTestServer:
def __init__(self, queue, path: str):
self.queue = queue
self.path = path
self.socket = None
self.context = zmq.Context()
self.connect()
def connect(self):
self.socket = self.context.socket(zmq.PAIR)
self.socket.connect(self.path)
def disconnect(self):
self.socket.setsockopt(zmq.LINGER,0)
self.socket.close()
def send_packet(self, pkg: bPacket):
msg = {"type": pkg.ptype, "id": pkg.flowid, "data": pkg.data}
self.send(msg)
@pytest.fixture
def client_server():
queue = Queue()
sock = os.path.join(tempfile.mkdtemp(), "bigsnitchtest.sock")
sock = f"ipc://{sock}"
client = NetworkThread(name="testthread",queue=queue,path=sock)
client.daemon = True
client.start()
server = MitmAddonTestServer(queue, sock)
server.connect()
yield queue, client, server
client.join(1)
server.disconnect()
@pytest.fixture
def client_no_start():
queue = Queue()
sock = os.path.join(tempfile.mkdtemp(), "bigsnitchtest.sock")
sock = f"ipc://{sock}"
client = NetworkThread(name="testthread",queue=queue,path=sock)
return queue, client
class TestBigSnitchWrapper:
def test_request_convert(self, flowstate_request):
req = bRequest(flow=flowstate_request)
d = {'content': '',
'headers': [bHeader(key='Host', value='yolo.jetzt'),
bHeader(key='User-Agent', value='curl/7.75.0'),
bHeader(key='Accept', value='*/*')],
'host': 'yolo.jetzt',
'http_version': 'HTTP/1.1',
'method': 'GET',
'port': 443,
'scheme': 'https',
'server_ip_address': '192.168.0.1',
'timestamp_end': 1619390482.69,
'timestamp_start': 1619390482.6886377,
'tls': True}
assert not DeepDiff(req.__dict__, d)
def test_response_convert(self, flowstate_response):
res = bResponse(flow=flowstate_response)
d = {'content': '<!DOCTYPE html">\n'
'<html>\n'
'<head> \n'
'<meta http-equiv="content-type" '
'content="text/html;charset=ISO-8859-1">\n'
'<title>todays yolo - 3026</title> \n'
'<style type="text/css" media="screen">\n'
'<!--\n'
'body \n'
'{\n'
'color: white;\n'
'background-color: #003;\n'
'margin: 0px\n'
'}\n'
'\n'
'.yolo\n'
'{\n'
'font-family: Verdana, Geneva, Arial, sans-serif;\n'
'font-weight: bold;\n'
'font-size: 3em;\n'
'}\n'
'\n'
'a:visited {\n'
'\tcolor: blue\n'
'}\n'
'\n'
'#content \n'
'{\n'
'font-family: Verdana, Geneva, Arial, sans-serif;\n'
'color: white;\n'
'background-color: transparent;\n'
'text-align: center;\n'
'font-size: 2em;\n'
'position: absolute;\n'
'top: 50%;\n'
'left: 50%;\n'
'-webkit-transform: translateX(-50%) translateY(-50%);\n'
'-moz-transform: translateX(-50%) translateY(-50%);\n'
'-ms-transform: translateX(-50%) translateY(-50%);\n'
'transform: translateX(-50%) translateY(-50%);\n'
'overflow: visible;\n'
'visibility: visible;\n'
'display: block\n'
'}\n'
'\n'
'#sponsors \n'
'{\n'
'font-family: Verdana, Geneva, Arial, sans-serif;\n'
'color: white;\n'
'background-color: transparent;\n'
'text-align: center;\n'
'font-size: 1em;\n'
'position: absolute;\n'
'top: 90%;\n'
'left: 50%;\n'
'-webkit-transform: translateX(-50%) translateY(-50%);\n'
'-moz-transform: translateX(-50%) translateY(-50%);\n'
'-ms-transform: translateX(-50%) translateY(-50%);\n'
'transform: translateX(-50%) translateY(-50%);\n'
'overflow: visible;\n'
'visibility: visible;\n'
'display: block\n'
'}\n'
'\n'
'#progress\n'
'{\n'
'\tbackground-color: #f08;\n'
'\tposition: absolute;\n'
'\tleft: 0px;\n'
'\ttop: 0px;\n'
'\theight: 10px;\n'
'}\n'
'\n'
'-->\n'
'</style>\n'
'</head>\n'
'<body>\n'
'<div id="progress" class="progressBarDiv"></div>\n'
'<div id="content">\n'
'the yolo for today is<br>\n'
'<span class="yolo" id="yoloValue" '
'title="YOLOVALUE">3026</span><br>\n'
'<br>\n'
'</div>\n'
'<div id="sponsors">\n'
'\t<a href="https://cat.yolo.jetzt">Cat</a>\n'
'\t<br>\n'
'\tRegulation (EU) 2016/679 compliant\n'
'</div>\n'
'<script type="text/javascript">\n'
'var currentYoloTS = 1619388000000; //Multiply by 1000 the YOLO '
'way\n'
'var tDiff = Date.now() - currentYoloTS;\n'
'\n'
'var dayMS = 24 * 60 * 60 * 1000;\n'
'\n'
'console.debug(currentYoloTS);\n'
'console.debug(Date.now() - currentYoloTS);\n'
'function checkTimestamps()\n'
'{\n'
'\ttDiff = Date.now() - currentYoloTS;\n'
'\tupdateUi();\n'
'\tif(tDiff > dayMS)\n'
'\t{\n'
'\t\tclearInterval(updateInterval);\n'
'\t\tlocation.reload();\t\n'
'\t}\n'
'}\n'
'\n'
'function updateUi()\n'
'{\n'
'\tvalPercent = tDiff / dayMS * 100;\n'
'\tif(valPercent > 100) valPercent = 100;\n'
'\tdocument.getElementById("progress").style = "width: " + '
'valPercent + "%;";\n'
'\tif(tDiff > dayMS)\n'
'\t{\n'
'\t\tdocument.getElementById("yoloValue").innerHTML = "&#xfffd;";\n'
'\t}\n'
'}\n'
'\n'
'updateUi();\n'
'var updateInterval = setInterval(checkTimestamps, 5000);\n'
'\n'
'//YOLODATE\n'
'</script>\n'
'</body>\n'
'</html>\n',
'headers': [bHeader(key='Server', value='nginx'),
bHeader(key='Date', value='Mon, 26 Apr 2021 18:31:56 GMT'),
bHeader(key='Content-Type', value='text/html'),
bHeader(key='Content-Length', value='2460'),
bHeader(key='Last-Modified', value='Sun, 25 Apr 2021 22:00:00 GMT'),
bHeader(key='Connection', value='keep-alive'),
bHeader(key='ETag', value='"6085e660-99c"'),
bHeader(key='Strict-Transport-Security', value='max-age=31536000; includeSubDomains; preload'),
bHeader(key='X-Xss-Protection', value='1; mode=block'),
bHeader(key='X-Content-Type-Options', value='nosniff'),
bHeader(key='Content-Security-Policy', value="default-src 'self'; script-src 'self' 'unsafe-inline'; connect-src 'self'; img-src 'self'; style-src 'self' 'unsafe-inline';"),
bHeader(key='X-Frame-Options', value='SAMEORIGIN'),
bHeader(key='Referrer-Policy', value='no-referrer'),
bHeader(key='Accept-Ranges', value='bytes')],
'http_version': 'HTTP/1.1',
'reason': 'OK',
'status_code': 200,
'timestamp_end': 1619461916.7979567,
'timestamp_start': 1619461916.7935555}
assert not DeepDiff(res.__dict__, d)
def test_convert_request_to_json(self, flowstate_request):
j = bRequest(flowstate_request).json()
d = {'content': '',
'headers': {'Accept': '*/*',
'Host': 'yolo.jetzt',
'User-Agent': 'curl/7.75.0'},
'host': 'yolo.jetzt',
'http_version': 'HTTP/1.1',
'method': 'GET',
'port': 443,
'scheme': 'https',
'server_ip_address': '192.168.0.1',
'timestamp_end': 1619390482.69,
'timestamp_start': 1619390482.6886377,
'tls': True}
assert not DeepDiff(j, d)
def test_convert_response_to_json(self, flowstate_response):
j = bResponse(flowstate_response).json()
d = {'content': '<!DOCTYPE html">\n'
'<html>\n'
'<head> \n'
'<meta http-equiv="content-type" '
'content="text/html;charset=ISO-8859-1">\n'
'<title>todays yolo - 3026</title> \n'
'<style type="text/css" media="screen">\n'
'<!--\n'
'body \n'
'{\n'
'color: white;\n'
'background-color: #003;\n'
'margin: 0px\n'
'}\n'
'\n'
'.yolo\n'
'{\n'
'font-family: Verdana, Geneva, Arial, sans-serif;\n'
'font-weight: bold;\n'
'font-size: 3em;\n'
'}\n'
'\n'
'a:visited {\n'
'\tcolor: blue\n'
'}\n'
'\n'
'#content \n'
'{\n'
'font-family: Verdana, Geneva, Arial, sans-serif;\n'
'color: white;\n'
'background-color: transparent;\n'
'text-align: center;\n'
'font-size: 2em;\n'
'position: absolute;\n'
'top: 50%;\n'
'left: 50%;\n'
'-webkit-transform: translateX(-50%) translateY(-50%);\n'
'-moz-transform: translateX(-50%) translateY(-50%);\n'
'-ms-transform: translateX(-50%) translateY(-50%);\n'
'transform: translateX(-50%) translateY(-50%);\n'
'overflow: visible;\n'
'visibility: visible;\n'
'display: block\n'
'}\n'
'\n'
'#sponsors \n'
'{\n'
'font-family: Verdana, Geneva, Arial, sans-serif;\n'
'color: white;\n'
'background-color: transparent;\n'
'text-align: center;\n'
'font-size: 1em;\n'
'position: absolute;\n'
'top: 90%;\n'
'left: 50%;\n'
'-webkit-transform: translateX(-50%) translateY(-50%);\n'
'-moz-transform: translateX(-50%) translateY(-50%);\n'
'-ms-transform: translateX(-50%) translateY(-50%);\n'
'transform: translateX(-50%) translateY(-50%);\n'
'overflow: visible;\n'
'visibility: visible;\n'
'display: block\n'
'}\n'
'\n'
'#progress\n'
'{\n'
'\tbackground-color: #f08;\n'
'\tposition: absolute;\n'
'\tleft: 0px;\n'
'\ttop: 0px;\n'
'\theight: 10px;\n'
'}\n'
'\n'
'-->\n'
'</style>\n'
'</head>\n'
'<body>\n'
'<div id="progress" class="progressBarDiv"></div>\n'
'<div id="content">\n'
'the yolo for today is<br>\n'
'<span class="yolo" id="yoloValue" '
'title="YOLOVALUE">3026</span><br>\n'
'<br>\n'
'</div>\n'
'<div id="sponsors">\n'
'\t<a href="https://cat.yolo.jetzt">Cat</a>\n'
'\t<br>\n'
'\tRegulation (EU) 2016/679 compliant\n'
'</div>\n'
'<script type="text/javascript">\n'
'var currentYoloTS = 1619388000000; //Multiply by 1000 the YOLO '
'way\n'
'var tDiff = Date.now() - currentYoloTS;\n'
'\n'
'var dayMS = 24 * 60 * 60 * 1000;\n'
'\n'
'console.debug(currentYoloTS);\n'
'console.debug(Date.now() - currentYoloTS);\n'
'function checkTimestamps()\n'
'{\n'
'\ttDiff = Date.now() - currentYoloTS;\n'
'\tupdateUi();\n'
'\tif(tDiff > dayMS)\n'
'\t{\n'
'\t\tclearInterval(updateInterval);\n'
'\t\tlocation.reload();\t\n'
'\t}\n'
'}\n'
'\n'
'function updateUi()\n'
'{\n'
'\tvalPercent = tDiff / dayMS * 100;\n'
'\tif(valPercent > 100) valPercent = 100;\n'
'\tdocument.getElementById("progress").style = "width: " + '
'valPercent + "%;";\n'
'\tif(tDiff > dayMS)\n'
'\t{\n'
'\t\tdocument.getElementById("yoloValue").innerHTML = "&#xfffd;";\n'
'\t}\n'
'}\n'
'\n'
'updateUi();\n'
'var updateInterval = setInterval(checkTimestamps, 5000);\n'
'\n'
'//YOLODATE\n'
'</script>\n'
'</body>\n'
'</html>\n',
'headers': [{'Server': 'nginx'},
{'Date': 'Mon, 26 Apr 2021 18:31:56 GMT'},
{'Content-Type': 'text/html'},
{'Content-Length': '2460'},
{'Last-Modified': 'Sun, 25 Apr 2021 22:00:00 GMT'},
{'Connection': 'keep-alive'},
{'ETag': '"6085e660-99c"'},
{'Strict-Transport-Security': 'max-age=31536000; '
'includeSubDomains; preload'},
{'X-Xss-Protection': '1; mode=block'},
{'X-Content-Type-Options': 'nosniff'},
{'Content-Security-Policy': "default-src 'self'; script-src "
"'self' 'unsafe-inline'; connect-src "
"'self'; img-src 'self'; style-src "
"'self' 'unsafe-inline';"},
{'X-Frame-Options': 'SAMEORIGIN'},
{'Referrer-Policy': 'no-referrer'},
{'Accept-Ranges': 'bytes'}],
'http_version': 'HTTP/1.1',
'reason': 'OK',
'status_code': 200,
'timestamp_end': 1619461916.7979567,
'timestamp_start': 1619461916.7935555}
assert not DeepDiff(j, d)
class TestMitmAddon:
def test_get_new_flows_empty(self, client_server):
queue, client, server = client_server
# queue empty, flows empty
assert queue.empty()
assert not len(client.flows)
client.get_new_flows()
# afterwards too
assert queue.empty()
assert not len(client.flows)
def test_get_new_flows_single(self, client_no_start, flow_request):
queue, client = client_no_start
flowitem = FlowItem(bFlowState.UNSENT_HTTP_REQUEST, flow_request)
queue.put_nowait((flow_request.id, flowitem))
client.get_new_flows()
assert queue.empty()
assert len(client.flows) == 1
assert flow_request.id in client.flows.keys()
assert not DeepDiff(flowitem, client.flows[flow_request.id])
def test_get_new_flows_double(self, client_no_start, flow_request, flow_response):
queue, client = client_no_start
flowitem = FlowItem(bFlowState.UNSENT_HTTP_REQUEST, flow_request)
queue.put_nowait((flow_request.id, flowitem))
flowitem2 = FlowItem(bFlowState.UNSENT_HTTP_RESPONSE, flow_response)
queue.put_nowait((flow_response.id, flowitem2))
client.get_new_flows()
assert queue.empty()
assert len(client.flows) == 2
assert flow_request.id in client.flows.keys()
assert not DeepDiff(flowitem, client.flows[flow_request.id])
assert flow_response.id in client.flows.keys()
assert not DeepDiff(flowitem2, client.flows[flow_response.id])
def test_request(self, client_server):
queue, client, server = client_server
# create request
#flowitem = FlowItem(bFlowState.UNSENT_HTTP_REQUEST, flowstate_request)
#self.q.put_nowait(('51215b69-c76f-4ac2-afcb-da3b823d9f88', flowitem))