2021-04-04 14:24:46 +00:00
|
|
|
#!/usr/bin/env python3
|
|
|
|
|
2021-04-25 19:38:58 +00:00
|
|
|
import pdb
|
|
|
|
|
2021-04-04 14:24:46 +00:00
|
|
|
import pytest
|
2021-04-25 19:38:58 +00:00
|
|
|
from networkthread import NetworkThread
|
|
|
|
import os
|
|
|
|
import tempfile
|
|
|
|
from queue import Queue
|
|
|
|
import zmq
|
|
|
|
|
|
|
|
class MitmAddonTestServer:
|
|
|
|
def __init__(self, queue, path: str):
|
|
|
|
self.queue = queue
|
|
|
|
self.path = path
|
|
|
|
self.socket = None
|
|
|
|
self.context = zmq.Context()
|
|
|
|
self.connect()
|
|
|
|
|
|
|
|
def connect(self):
|
|
|
|
self.socket = self.context.socket(zmq.PAIR)
|
|
|
|
self.socket.connect(self.path)
|
|
|
|
|
|
|
|
def disconnect(self):
|
|
|
|
self.socket.setsockopt(zmq.LINGER,0)
|
|
|
|
self.socket.close()
|
|
|
|
|
|
|
|
def send_packet(self, pkg: bPacket):
|
|
|
|
msg = {"type": pkg.ptype, "id": pkg.flowid, "data": pkg.data}
|
|
|
|
self.send(msg)
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
def client_server():
|
|
|
|
queue = Queue()
|
|
|
|
sock = os.path.join(tempfile.mkdtemp(), "bigsnitchtest.sock")
|
|
|
|
sock = f"ipc://{sock}"
|
|
|
|
client = NetworkThread(name="testthread",queue=queue,path=sock)
|
|
|
|
client.daemon = True
|
|
|
|
client.start()
|
|
|
|
server = MitmAddonTestServer(queue, sock)
|
|
|
|
server.connect()
|
|
|
|
yield client, server
|
|
|
|
client.join(1)
|
|
|
|
server.disconnect()
|
2021-04-04 14:24:46 +00:00
|
|
|
|
2021-04-25 19:38:58 +00:00
|
|
|
class TestBigSnitchWrapper:
|
|
|
|
def test_request_convert(self):
|
|
|
|
pass
|
|
|
|
"""
|
|
|
|
class TestMitmAddon:
|
|
|
|
def test_request(self, client_server):
|
|
|
|
self.client, self.server = client_server
|
|
|
|
# create request
|
|
|
|
flowitem = FlowItem(bFlowState.UNSENT_HTTP_REQUEST, flow)
|
|
|
|
self.q.put_nowait((flow.id, flowitem))
|
|
|
|
"""
|