import io
import http
import errno
import urllib.request
from ssl import SSLSocket, SSLContext
from socket import SocketIO
from urllib.parse import urljoin
import _socket
from pytest_response import response
from pytest_response.exceptions import RemoteBlockedError, ResponseNotFound
from pytest_response.logger import log
__all__ = [
"ResponseSocketIO",
"ResponseSocket",
"Response_SSLSocket",
"ResponseHTTPConnection",
"ResponseHTTPHandler",
"ResponseHTTPResponse",
"ResponseHTTPSConnection",
"ResponseHTTPSHandler",
"install",
"uninstall",
]
EBADF = getattr(errno, "EBADF", 9)
EAGAIN = getattr(errno, "EAGAIN", 11)
EWOULDBLOCK = getattr(errno, "EWOULDBLOCK", 11)
_blocking_errnos = {EAGAIN, EWOULDBLOCK}
CONFIG = {"url": None, "host": None, "https": None, "headers": None, "status": 200}
def _build_url(host, url, headers, https=False):
"""
Internal controller method for building urls.
"""
global CONFIG
_scheme = "https://" if https else "http://"
_url = "".join([_scheme, host])
CONFIG["url"] = urljoin(_url, url)
CONFIG["https"] = https
CONFIG["headers"] = headers
return
[docs]
class ResponseSocketIO(SocketIO):
"""
Provides a method to write the value of the buffer into another var for dumping.
Wrapper for `~socket.SocketIO`.
"""
def __init__(self, sock, mode):
if mode not in ("r", "w", "rw", "rb", "wb", "rwb"):
raise ValueError("invalid mode: %r" % mode)
self.output = io.BytesIO() # internal buffer
super().__init__(sock, mode)
[docs]
def readinto(self, b):
"""
Wrapper function for `socket.SocketIO.readinto`
"""
_ = super().readinto(b)
self.output.write(b.tobytes())
return _
def __del__(self):
if response.capture and response.remote:
global CONFIG
url = CONFIG.get("url")
status = CONFIG.get("status")
response.insert(url=url, response=self.output.getvalue(), headers={}, status=status)
[docs]
class ResponseSocket(_socket.socket):
"""
Socket implementation of Pytest-Response
Provides `ResponseSocket.makefile` method to return a buffer built with `ResponseSocketIO`
"""
def __init__(self, host, port, *args, **kwargs):
self.host = host
self.port = port
self._io_refs = 0
self._closed = False
super().__init__()
self.connect()
[docs]
def connect(self, *args, **kwargs):
"""
Connects to host in capturing mode otherwise passes.
Wrapper for `_socket.socket.connect`
"""
if not response.remote:
log.error(f"remote:{response.remote}")
raise RemoteBlockedError
if not response.response:
super().connect((self.host, self.port), *args, **kwargs)
[docs]
def makefile(
self,
mode="r",
buffering=None,
encoding=None,
errors=None,
newline=None,
*args,
**kwargs,
):
"""
Provides makefile() method which returns a Buffered IO built with `ResponseSocketIO`
"""
# if response.capture:
writing = "w" in mode
reading = "r" in mode or not writing
binary = "b" in mode
rawmode = ""
if reading:
rawmode += "r"
if writing:
rawmode += "w"
raw = ResponseSocketIO(self, rawmode)
if not buffering:
buffering = io.DEFAULT_BUFFER_SIZE
if buffering == 0:
if not binary:
raise ValueError("unbuffered streams must be binary")
return raw
if reading and writing:
buffer = io.BufferedRWPair(raw, raw, buffering)
elif reading:
buffer = io.BufferedReader(raw, buffering)
else:
assert writing
buffer = io.BufferedWriter(raw, buffering)
if binary:
return buffer
text = io.TextIOWrapper(buffer, encoding, errors, newline)
text.mode = mode
return text
def _decref_socketios(self):
if self._io_refs > 0:
self._io_refs -= 1
if self._closed:
self.close()
[docs]
def sendall(self, data, *args, **kwargs):
"""
Wrapper for `_socket.socket.sendall`
"""
if type(data) is not bytes:
data = data.encode("utf-8")
if response.remote and not response.response:
super().sendall(data, *args, **kwargs)
[docs]
class Response_SSLSocket(SSLSocket):
"""
SSLSocket implementation of Pytest-Response
Provides a wrapper `recv_into` for capturing the response.
"""
output = io.BytesIO()
def recv_into(self, buffer, nbytes=None, flags=0):
"""
Wrapper for `SSLSocket.recv_into`
Provides a way to capture the response.
"""
_ = super().recv_into(buffer, nbytes, flags)
self.output.write(buffer.tobytes().rstrip(b"\x00").lstrip(b"\x00"))
return _
def __del__(self):
if response.capture and response.remote:
global CONFIG
url = CONFIG.get("url")
status = CONFIG.get("status")
response.insert(url=url, response=self.output.getvalue(), headers={}, status=status)
pass
[docs]
class ResponseHTTPResponse(http.client.HTTPResponse):
"""
Provides a way to capture or respond with a saved response.
"""
def __init__(self, sock, debuglevel=0, method=None, headers=None):
self.sock = sock
self.output = io.BytesIO()
super().__init__(sock=sock, debuglevel=debuglevel, method=method)
[docs]
def begin(self, *args, **kwargs):
if not response.remote:
log.error(f"remote:{response.remote}")
raise RemoteBlockedError
if response.response:
global CONFIG
self.fp = io.BytesIO()
status, data, headers = response.get(url=CONFIG.get("url", ""))
if not data:
self.code = self.status = 404
self.reason = "Response Not Found (pytest-response)"
self.will_close = True
log.error(f"Response not found {CONFIG.get('url', '')}")
raise ResponseNotFound
self.output.write(b"HTTP/1.0 " + str(status).encode("ISO-8859-1") + b"\n")
self.output.write(data)
self.will_close = False
self.fp = self.output
self.fp.seek(0)
super().begin(*args, **kwargs)
pass
[docs]
class ResponseHTTPConnection(http.client.HTTPConnection):
"""
Wrapper for `~http.client.HTTPConnection`
"""
response_class = ResponseHTTPResponse
[docs]
def request(self, method, url, body=None, headers={}, *, encode_chunked=False):
"""Send a complete request to the server."""
_build_url(self.host, url, headers, False)
self._send_request(method, url, body, headers, encode_chunked)
[docs]
def connect(self):
"""
Override the connect() function to intercept calls.
"""
if not response.remote:
log.error(f"Attempt to connect. remote:{response.remote}")
raise RemoteBlockedError
try:
log.debug("Intercepting call to %s:%s\n" % (self.host, self.port))
self.sock = ResponseSocket(self.host, self.port)
except Exception:
raise
pass
[docs]
class ResponseHTTPHandler(urllib.request.HTTPHandler):
"""
Override the default HTTPHandler class with one that uses the
ResponseHTTPConnection class to open HTTP URLs.
"""
[docs]
def http_open(self, req):
return self.do_open(ResponseHTTPConnection, req)
pass
[docs]
class ResponseHTTPSConnection(http.client.HTTPSConnection, ResponseHTTPConnection):
"""
Override the default `~HTTPSConnection` to use `~ResponseSocket`
"""
[docs]
def request(self, method, url, body=None, headers={}, *, encode_chunked=False):
"""Send a complete request to the server."""
_build_url(self.host, url, headers, True)
self._send_request(method, url, body, headers, encode_chunked)
[docs]
def connect(self):
if not response.remote:
log.error(f"Attempting to connect. remote:{response.remote}")
raise RemoteBlockedError
log.debug("Intercepting call to %s:%s\n" % (self.host, self.port))
self.sock = ResponseSocket(
host=self.host,
port=self.port,
https=True,
timeout=self.timeout,
source_address=self.source_address,
)
self.sock.setsockopt(_socket.IPPROTO_TCP, _socket.TCP_NODELAY, 1)
if response.capture:
SSLContext.sslsocket_class = Response_SSLSocket
if not response.response:
if self._tunnel_host:
self._tunnel()
if self._tunnel_host:
server_hostname = self._tunnel_host
else:
server_hostname = self.host
self.sock = self._context.wrap_socket(self.sock, server_hostname=server_hostname)
pass
[docs]
class ResponseHTTPSHandler(urllib.request.HTTPSHandler):
"""
Override the default HTTPSHandler class with one that uses the
ResponseHTTPSConnection class to open HTTP URLs.
"""
[docs]
def https_open(self, req):
return self.do_open(ResponseHTTPSConnection, req)
def install_opener():
handlers = [ResponseHTTPHandler(), ResponseHTTPSHandler()]
opener = urllib.request.build_opener(*handlers)
urllib.request.install_opener(opener)
return opener
def uninstall_opener():
urllib.request.install_opener(None)
install = install_opener
uninstall = uninstall_opener