import base64
import os
import re
import socket
import time
from collections import namedtuple
from logging import getLogger
from typing import Callable
from urllib.parse import urlencode, urlparse
from frozendict import frozendict as fd
from tornado.httpclient import AsyncHTTPClient, HTTPError
from hgraph import service_adaptor, TS, service_adaptor_impl, TSD, push_queue, GlobalState, sink_node
from hgraph.adaptors.tornado._tornado_web import TornadoWeb
from hgraph.adaptors.tornado.http_server_adaptor import (
HttpRequest,
HttpResponse,
HttpPostRequest,
HttpGetRequest,
HttpPutRequest,
HttpDeleteRequest,
)
logger = getLogger(__name__)
class Credentials:
def __init__(self, username, password):
self.username = username
self.password = password
def __repr__(self):
return f"credentials"
def __str__(self):
return f"credentials"
authorization_match = re.compile(r"'Authorization':\s*'(?:Bearer|Basic|Digest|Negotiate|NTLM)\s+[^']+'")
[docs]
@service_adaptor
def http_client_adaptor(request: TS[HttpRequest], path: str = "http_client") -> TS[HttpResponse]:
"""
Sends requests to a remote server. The result is returned as an HttpResponse object.
Supports Get, Put, Delete and Post requests.
To use the adaptor you need to register the service impl. The default provided implementation is
``http_client_adaptor_impl``.
"""
[docs]
@service_adaptor_impl(interfaces=http_client_adaptor)
def http_client_adaptor_impl(
request: TSD[int, TS[HttpRequest]], path: str = "http_client", use_curl: bool = False, max_clients: int = 50
) -> TSD[int, TS[HttpResponse]]:
"""
The client adaptor is responsible for making HTTP requests to a remote server. This implementation is able to
support NTLM and Kerberos authentication.
"""
if use_curl:
AsyncHTTPClient.configure("tornado.curl_httpclient.CurlAsyncHTTPClient", max_clients=max_clients)
logger.info("Starting client adaptor on path: '%s'", path)
@push_queue(TSD[int, TS[HttpResponse]])
def from_web(sender, path: str = "http_client") -> TSD[int, TS[HttpResponse]]:
GlobalState.instance()[f"http_client_adaptor://{path}/queue"] = sender
return None
async def handle_auth_win(response, request, client):
import win32security
import sspi
import sspicon
import pywintypes
import sspicon
auth_header = response.headers.get("www-authenticate")
if not auth_header:
raise HTTPError(401, "missing www-authenticate header")
auth_header = auth_header.lower()
if "negotiate" in auth_header:
scheme = "Negotiate"
elif "ntlm" in auth_header:
scheme = "NTLM"
else:
raise HTTPError(401, "unhandled protocol")
parsed_url = urlparse(response.request.url)
host = parsed_url.hostname
try:
info = socket.getaddrinfo(host, None, 0, 0, 0, socket.AI_CANONNAME)
host = info[0][3]
except socket.gaierror as error:
logger.info(f"Skipping canonicalization of name {host} due to error: {error}")
targetspn = f"HTTP/{host}"
scflags = sspicon.ISC_REQ_MUTUAL_AUTH
pkg_info = win32security.QuerySecurityPackageInfo(scheme)
clientauth = sspi.ClientAuth(
scheme, targetspn=targetspn, auth_info=None, scflags=scflags, datarep=sspicon.SECURITY_NETWORK_DREP
)
sec_buffer = win32security.PySecBufferDescType()
# handling HTTPS connection will need peercert handling here
set_cookie = response.headers.get("set-cookie")
if set_cookie is not None:
response.request.headers["Cookie"] = set_cookie
try:
err, auth = clientauth.authorize(sec_buffer)
data = base64.b64encode(auth[0].Buffer).decode("ASCII")
response.request.headers["Authorization"] = f"{scheme} {data}"
except pywintypes.error as error:
logger.error("Error calling %s: %s", error[1], error[2], exc_info=error)
return response
response2 = await client.fetch(response.request, raise_error=False)
if response2.code != 401:
final = response2.headers.get("WWW-Authenticate")
if final is not None:
try:
if scheme in final:
challenge = [v[len(scheme) + 1 :] for val in final.split(",") if scheme in (v := val.strip())]
else:
challenge = [val.strip() for val in final.split(",")]
if len(challenge) > 1:
raise HTTPError(401, f"Received more than one {scheme} challenge from server")
tokenbuf = win32security.PySecBufferType(pkg_info["MaxToken"], sspicon.SECBUFFER_TOKEN)
tokenbuf.Buffer = base64.b64decode(challenge[0])
sec_buffer.append(tokenbuf)
err, auth = clientauth.authorize(sec_buffer)
logger.debug(
"Kerberos Authentication succeeded - error=%s authenticated=%s", err, clientauth.authenticated
)
except TypeError:
pass
return response2
set_cookie = response2.headers.get("set-cookie")
if set_cookie is not None:
response2.request.headers["Cookie"] = set_cookie
challenge = [
v[len(scheme) + 1 :]
for val in response2.headers.get("WWW-Authenticate", "").split(",")
if scheme in (v := val.strip())
]
if len(challenge) > 1:
raise HTTPError(401, f"Received more than one {scheme} challenge from server")
elif len(challenge) == 0:
import re
base64_pattern = r"(?:[A-Za-z0-9+/]{4})*(?:[A-Za-z0-9+/]{2}==|[A-Za-z0-9+/]{3}=)?"
matches = re.findall(base64_pattern, final)
if matches:
challenge = [matches[0]]
else:
raise HTTPError(401, f"Could not find any {scheme} challenge in WWW-Authenticate header: {final}")
tokenbuf = win32security.PySecBufferType(pkg_info["MaxToken"], sspicon.SECBUFFER_TOKEN)
tokenbuf.Buffer = base64.b64decode(challenge[0])
sec_buffer.append(tokenbuf)
try:
err, auth = clientauth.authorize(sec_buffer)
data = base64.b64encode(auth[0].Buffer).decode("ASCII")
response2.request.headers["Authorization"] = f"{scheme} {data}"
except pywintypes.error as error:
logger.error("Error calling %s: %s", error[1], error[2], exc_info=error)
return response2
response3 = await client.fetch(response2.request, raise_error=False)
return response3
async def handle_auth(response, request, client):
import spnego
auth_header = response.headers.get("www-authenticate")
if not auth_header:
raise HTTPError(401, "missing www-authenticate header")
# Parse auth header to handle case when both NTLM and Negotiate are present
auth_header = auth_header.lower()
if "negotiate" in auth_header:
scheme = "Negotiate"
protocol = "kerberos"
username = None
password = None
elif "ntlm" in auth_header:
scheme = "NTLM"
protocol = "ntlm"
if request.auth is not None and isinstance(request.auth, Credentials):
username = request.auth.username
password = request.auth.password
else:
raise HTTPError(
401, "NTLM Authentication on non-windows hosts is not supported without supplying credentials"
)
else:
raise HTTPError(401, "unhandled protocol")
parsed_url = urlparse(response.request.url)
host = parsed_url.hostname
try:
info = socket.getaddrinfo(host, None, 0, 0, 0, socket.AI_CANONNAME)
host = info[0][3]
except socket.gaierror as error:
logger.info(f"Skipping canonicalization of name {host} due to error: {error}")
ctx = spnego.client(
username=username,
password=password,
hostname=host,
service="HTTP",
channel_bindings=None,
context_req=spnego.ContextReq.sequence_detect | spnego.ContextReq.mutual_auth,
protocol=protocol,
)
for _ in range(2):
auth_req = re.search(f"{scheme}\\s*([^,]*)", auth_header, re.I)
if auth_req is None:
raise HTTPError(401, "No auth token found")
gss_r = ctx.step(in_token=base64.b64decode(auth_req[1]))
response.request.headers["Authorization"] = f"{scheme} {base64.b64encode(gss_r).decode()}"
# handling HTTPS connection will need peercert handling here
set_cookie = response.headers.get("set-cookie")
if set_cookie is not None:
response.request.headers["Cookie"] = set_cookie
response2 = await client.fetch(response.request, raise_error=False)
if response2.code != 401:
final = response2.headers.get("WWW-Authenticate")
if final is not None:
try:
scheme_match = re.search(f"{scheme}\\s*([^,]*)", final, re.I)
if scheme_match is not None:
token = scheme_match[1]
else:
# Try to find any Base64-encoded token in the header
base64_pattern = r"(?:[A-Za-z0-9+/]{4})*(?:[A-Za-z0-9+/]{2}==|[A-Za-z0-9+/]{3}=)?"
matches = re.findall(base64_pattern, final)
if matches and len(matches[0]) > 8:
token = matches[0]
else:
raise HTTPError(401, f"No valid auth token found in header: {final}")
ctx.step(in_token=base64.b64decode(token))
except spnego.exceptions.SpnegoError:
logger.error("authenticate_server(): ctx step() failed:")
raise HTTPError(401, "Kerberos Authentication failed")
return response2
else:
response = response2
auth_header = response.headers.get("www-authenticate")
raise HTTPError(401, f"Kerberos Authentication failed: {response}")
async def make_http_request(id: int, request: HttpRequest, sender: Callable):
start_time = time.perf_counter_ns()
try:
client = AsyncHTTPClient(force_instance=True)
if request.query:
url = f"{request.url}?{urlencode(request.query)}"
else:
url = request.url
log_url = authorization_match.sub("'Authorization': '[REDACTED]'", url)
timeouts = {"connect_timeout": request.connect_timeout, "request_timeout": request.request_timeout}
if isinstance(request, HttpGetRequest):
logger.debug("[GET][%i][%s]", id, log_url)
response = await client.fetch(url, method="GET", headers=request.headers, raise_error=False, **timeouts)
elif isinstance(request, HttpPostRequest):
logger.debug("[POST][%i][%s] body: %s", id, log_url, request.body)
response = await client.fetch(
url, method="POST", headers=request.headers, body=request.body, raise_error=False, **timeouts
)
elif isinstance(request, HttpPutRequest):
logger.debug("[PUT][%i][%s] body: %s", id, log_url, request.body)
response = await client.fetch(
url, method="PUT", headers=request.headers, body=request.body, raise_error=False, **timeouts
)
elif isinstance(request, HttpDeleteRequest):
logger.debug("[DELETE][%i][%s]", id, log_url)
response = await client.fetch(
url, method="DELETE", headers=request.headers, raise_error=False, **timeouts
)
else:
logger.error("Bad request received: %s", request)
response = namedtuple("HttpResponse_", ["code", "headers", "body"])(
400, fd(), b"Incorrect request type provided"
)
if response.code == 401 and response.headers.get("WWW-Authenticate") is not None:
logger.debug("[AUTH] requesting authentication")
try:
if os.name == "nt":
response = await handle_auth_win(response, request, client)
else:
response = await handle_auth(response, request, client)
except HTTPError as e:
logger.error("[AUTH] authentication failed: %s", e)
sender({id: HttpResponse(status_code=e.code, body=e.message.encode())})
return
except Exception as e:
logger.error("request %i failed : %s", id, e)
sender({id: HttpResponse(status_code=400, body=str(e).encode())})
return
logger.info("request %i succeeded in %i ms", id, int((time.perf_counter_ns() - start_time) / 1000000))
sender({id: HttpResponse(status_code=response.code, headers=response.headers, body=response.body)})
@sink_node
def to_web(request: TSD[int, TS[HttpRequest]]):
sender = GlobalState.instance()[f"http_client_adaptor://{path}/queue"]
for i, r in request.modified_items():
TornadoWeb.get_loop().add_callback(make_http_request, i, r.value, sender)
@to_web.start
def to_web_start():
TornadoWeb.start_loop()
@to_web.stop
def to_web_stop():
TornadoWeb.stop_loop()
to_web(request)
return from_web()