#!/usr/bin/env python3 import pdb import queue import pytest from networkthread import bPacket, bRequest, bResponse, bHeader, NetworkThread, FlowItem, bFlowState import os import tempfile from queue import Queue import zmq from deepdiff import DeepDiff # usual flow state of the request with some big parts removed @pytest.fixture def flowstate_request(): return {'client_conn': {'address': ('::ffff:127.0.0.1', 60630, 0, 0), 'alpn_proto_negotiated': b'http/1.1', 'cipher_name': 'TLS_AES_256_GCM_SHA384', 'clientcert': None, 'id': '5dde7ef8-9b1a-4b60-9d15-d308442a27ea', 'mitmcert': '', 'sni': 'yolo.jetzt', 'timestamp_end': None, 'timestamp_start': 1619390481.8003347, 'timestamp_tls_setup': 1619390482.6879823, 'tls_established': True, 'tls_extensions': [], 'tls_version': 'TLSv1.3'}, 'error': None, 'id': '51215b69-c76f-4ac2-afcb-da3b823d9f88', 'intercepted': False, 'is_replay': None, 'marked': False, 'metadata': {}, 'mode': 'transparent', 'request': {'authority': b'', 'content': b'', 'headers': ((b'Host', b'yolo.jetzt'), (b'User-Agent', b'curl/7.75.0'), (b'Accept', b'*/*')), 'host': 'yolo.jetzt', 'http_version': b'HTTP/1.1', 'method': b'GET', 'path': b'/', 'port': 443, 'scheme': b'https', 'timestamp_end': 1619390482.69, 'timestamp_start': 1619390482.6886377, 'trailers': None}, 'response': None, 'server_conn': {'address': ('yolo.jetzt', 443), 'alpn_proto_negotiated': b'http/1.1', 'cert': '', 'id': 'ecc4cd3b-7e35-4815-b618-5931fe64729b', 'ip_address': ('95.156.226.69', 443), 'sni': 'yolo.jetzt', 'source_address': ('192.168.42.182', 51514), 'timestamp_end': None, 'timestamp_start': 1619390481.8154442, 'timestamp_tcp_setup': 1619390481.994565, 'timestamp_tls_setup': 1619390482.6819758, 'tls_established': True, 'tls_version': 'TLSv1.2', 'via': None}, 'type': 'http', 'version': 9} @pytest.fixture() def flowstate_response(): return {'client_conn': {'address': ('::ffff:127.0.0.1', 30190, 0, 0), 'alpn_proto_negotiated': b'http/1.1', 'cipher_name': 'TLS_AES_256_GCM_SHA384', 'clientcert': None, 'id': '2507e6ce-3132-4394-9432-f55fb5f55b05', 'mitmcert': '', 'sni': 'yolo.jetzt', 'timestamp_end': None, 'timestamp_start': 1619461916.6160116, 'timestamp_tls_setup': 1619461916.7581937, 'tls_established': True, 'tls_extensions': [], 'tls_version': 'TLSv1.3'}, 'error': None, 'id': '449d1a87-744f-4a18-9a5d-f085f99a5c62', 'intercepted': True, 'is_replay': None, 'marked': False, 'metadata': {}, 'mode': 'transparent', 'request': {'authority': b'', 'content': b'', 'headers': ((b'Host', b'yolo.jetzt'), (b'User-Agent', b'curl/7.75.0'), (b'Accept', b'*/*')), 'host': 'yolo.jetzt', 'http_version': b'HTTP/1.1', 'method': b'GET', 'path': b'/', 'port': 443, 'scheme': b'https', 'timestamp_end': 1619461916.7603076, 'timestamp_start': 1619461916.7588415, 'trailers': None}, 'response': {'content': b'\n\n \n\ntodays yolo - 3026 \n\n' '\n' '\n' '
\n' '
\n' 'the yolo for today is
\n' '3026
\n' '
\n' '
\n' '
\n' '\tCat\n' '\t
\n' '\tRegulation (EU) 2016/679 compliant\n' '
\n' '\n' '\n' '\n', 'headers': [bHeader(key='Server', value='nginx'), bHeader(key='Date', value='Mon, 26 Apr 2021 18:31:56 GMT'), bHeader(key='Content-Type', value='text/html'), bHeader(key='Content-Length', value='2460'), bHeader(key='Last-Modified', value='Sun, 25 Apr 2021 22:00:00 GMT'), bHeader(key='Connection', value='keep-alive'), bHeader(key='ETag', value='"6085e660-99c"'), bHeader(key='Strict-Transport-Security', value='max-age=31536000; includeSubDomains; preload'), bHeader(key='X-Xss-Protection', value='1; mode=block'), bHeader(key='X-Content-Type-Options', value='nosniff'), bHeader(key='Content-Security-Policy', value="default-src 'self'; script-src 'self' 'unsafe-inline'; connect-src 'self'; img-src 'self'; style-src 'self' 'unsafe-inline';"), bHeader(key='X-Frame-Options', value='SAMEORIGIN'), bHeader(key='Referrer-Policy', value='no-referrer'), bHeader(key='Accept-Ranges', value='bytes')], 'http_version': 'HTTP/1.1', 'reason': 'OK', 'status_code': 200, 'timestamp_end': 1619461916.7979567, 'timestamp_start': 1619461916.7935555} assert not DeepDiff(res.__dict__, d) class TestMitmAddon: def test_get_new_flows_empty(self, client_server): queue, client, server = client_server # queue empty, flows empty assert queue.empty() assert not len(client.flows) client.get_new_flows() # afterwards too assert queue.empty() assert not len(client.flows) def test_get_new_flows_single(self, client_server): queue, client, server = client_server def test_request(self, client_server): queue, client, server = client_server # create request #flowitem = FlowItem(bFlowState.UNSENT_HTTP_REQUEST, flowstate_request) #self.q.put_nowait(('51215b69-c76f-4ac2-afcb-da3b823d9f88', flowitem))