import json
from dataclasses import dataclass
from enum import Enum
from typing import Callable, Generic, TypeVar
from frozendict import frozendict
from hgraph import (
CompoundScalar,
COMPOUND_SCALAR,
graph,
TS,
TSD,
HgTSBTypeMetaData,
combine,
map_,
convert,
OUT,
compute_node,
dispatch_,
AUTO_RESOLVE,
from_json,
nothing,
operator,
DEFAULT,
Base,
to_json_builder,
with_signature,
ts_schema,
TSB,
from_json_builder,
)
__all__ = (
"REST_RESPONSE",
"RestCreateRequest",
"RestCreateResponse",
"RestDeleteRequest",
"RestDeleteResponse",
"RestListRequest",
"RestListResponse",
"RestReadRequest",
"RestReadResponse",
"RestRequest",
"RestResponse",
"RestResultEnum",
"RestUpdateRequest",
"RestUpdateResponse",
"rest_handler",
)
from hgraph.adaptors.tornado.http_server_adaptor import (
HttpRequest,
HttpResponse,
HttpGetRequest,
HttpPostRequest,
HttpPutRequest,
HttpDeleteRequest,
http_server_handler,
)
[docs]
class RestResultEnum(Enum):
OK = 200
CREATED = 201 # From a POST
NO_CONTENT = 204 # Success but there is no body-content
BAD_REQUEST = 400 # Response with validation error
UNAUTHORIZED = 401
FORBIDDEN = 403 # Even if authorised, the user cannot perform the action
NOT_FOUND = 404
REQUEST_TIMED_OUT = 408
CONFLICT = 409 # For example, trying to create an object that already exists
TOO_MANY_REQUESTS = 429
INTERNAL_SERVER_ERROR = 500 # A problem (for example, an exception has occurred)
NOT_IMPLEMENTED = 501
BAD_GATEWAY = 502
SERVICE_UNAVAILABLE = 503
GATEWAY_TIMEOUT = 504
HTTP_VERSION_NOT_SUPPORTED = 505
TORNADO_TIMED_OUT = 599 # Unofficially used by Tornado to indicate network time-out
[docs]
@dataclass(frozen=True)
class RestRequest(CompoundScalar):
"""Marker class for all rest operations"""
url: str
REST_REQUEST = TypeVar("REST_REQUEST", bound=RestRequest)
[docs]
@dataclass(frozen=True)
class RestCreateRequest(RestRequest, Generic[COMPOUND_SCALAR]):
"""The value associated to the id should be created"""
id: str
value: COMPOUND_SCALAR
[docs]
@dataclass(frozen=True)
class RestUpdateRequest(RestRequest, Generic[COMPOUND_SCALAR]):
"""The value associated to the id should be updated"""
id: str
value: COMPOUND_SCALAR
[docs]
@dataclass(frozen=True)
class RestReadRequest(RestRequest):
"""The id is requested to have it's value returned"""
id: str
[docs]
@dataclass(frozen=True)
class RestDeleteRequest(RestRequest):
"""The id is requested to be removed"""
id: str
[docs]
@dataclass(frozen=True)
class RestListRequest(RestRequest):
"""No attributes provided"""
[docs]
@dataclass(frozen=True)
class RestResponse(CompoundScalar):
status: RestResultEnum
reason: str = "" # Populated when OK (or success equivalent) is NOT set as the status
REST_RESPONSE = TypeVar("REST_RESPONSE", bound=RestResponse)
REST_RESPONSE_ = TypeVar("REST_RESPONSE_", bound=RestResponse)
[docs]
@dataclass(frozen=True)
class RestCreateResponse(RestResponse, Generic[COMPOUND_SCALAR]):
"""
The status property should be set as follows:
* If successfully created, this should set the status to CREATED.
* If the value already exists, this should be set to CONFLICT.
* If the attempt fails due to a validation error, this should be set to BAD_REQUEST.
* The user is not authorised, this should be set to UNAUTHORIZED
* If the task cannot be completed, this should be set to FORBIDDEN.
The value is returned when the value is modified during the construction process.
The id will always be provided if the value is created.
"""
id: str = None
value: COMPOUND_SCALAR = None
[docs]
@dataclass(frozen=True)
class RestReadResponse(RestResponse, Generic[COMPOUND_SCALAR]):
"""
The status property should be set as follows:
* If found, this should be set to OK.
* If the content is not found, this should be set to NOT_FOUND.
* The user is not authorised, this should be set to UNAUTHORIZED
* If the task cannot be completed, this should be set to FORBIDDEN.
The id and value are returned if found.
"""
id: str = None
value: COMPOUND_SCALAR = None
[docs]
@dataclass(frozen=True)
class RestUpdateResponse(RestResponse, Generic[COMPOUND_SCALAR]):
"""
The status property should be set as follows:
* If successfully updated, this should set the status to OK if the value is going to be returned or NO_CONTENT if
no value is to be returned.
* If the content is not found, this should be set to NOT_FOUND.
* If the attempt fails due to a validation error, this should be set to BAD_REQUEST.
* The user is not authorised, this should be set to UNAUTHORIZED
* If the task cannot be completed, this should be set to FORBIDDEN.
The id and value should be set when the value is modified during the update process.
"""
id: str = None
value: COMPOUND_SCALAR = None
[docs]
@dataclass(frozen=True)
class RestDeleteResponse(RestResponse):
"""
The status property should be set as follows:
* If successfully deleted, this should set the status to NO_CONTENT.
* If the content is not found, this should be set to NOT_FOUND.
* The user is not authorised, this should be set to UNAUTHORIZED
* If the task cannot be completed, this should be set to FORBIDDEN.
"""
[docs]
@dataclass(frozen=True)
class RestListResponse(RestResponse):
"""
Returns the list of id's available.
The status property should be set as follows:
* If the operation is successfully performed, this should set the status to OK.
* The user is not authorised, this should be set to UNAUTHORIZED
* If the task cannot be completed, this should be set to FORBIDDEN.
"""
ids: tuple[str, ...] = tuple()
[docs]
def rest_handler(fn: Callable = None, *, url: str, data_type: type[COMPOUND_SCALAR]):
"""
A rest handler wraps a function of the form:
::
@rest_handler(url="http://example.com/my_cs", data_type=MyCS)
def my_fn(request: TS[RestRequest[MyCS]) -> TS[RestResponse[MyCS]]:
...
The function is responsible for handling individual requests and producing the
appropriate responses. If the component requires full visibility to the full set
of requests, then it is possible for the multiplexed signature to be used:
::
@rest_handler(url="http://example.com/my_cs", data_type=MyCS)
def my_fn(request: TSD[int, TS[RestRequest[MyCS]]) -> TSD[int, TS[RestResponse[MyCS]]]:
...
To enable the rest handler, the ``register_http_server_adaptor`` should be called to
set up the processes web-service process.
.. note:: The rest handler is self registering, once declared and imported nothing
else needs to be done.
"""
if fn is None:
return lambda fn: rest_handler(fn, url=url, data_type=data_type)
from hgraph import WiringNodeClass
if not isinstance(fn, WiringNodeClass):
fn = graph(fn)
assert "request" in fn.signature.time_series_inputs.keys(), "Rest handler graph must have an input named 'request'"
assert fn.signature.time_series_inputs["request"].matches_type(TS[RestRequest]) or fn.signature.time_series_inputs[
"request"
].matches_type(
TSD[int, TS[RestRequest]]
), f"Graph must have a single input named 'request' of type TS[RestRequest] or TSD[int, TS[RestRequest]]"
assert not url.endswith("/"), "URL cannot end with a '/'"
output_type = fn.signature.output_type
is_tsb = False
if isinstance(output_type, HgTSBTypeMetaData):
is_tsb = True
output_type = output_type["response"]
assert (single_value := output_type.matches_type(TS[RestResponse])) or output_type.matches_type(
TSD[int, TS[RestResponse]]
), "Graph must have a single output of type TS[RestResponse] or TSD[int, TS[RestResponse]]"
if is_tsb:
kwargs = {k: v.py_type for k, v in fn.signature.output_type.bundle_schema_tp.meta_data_schema.items()} | {
"response": TS[HttpResponse] if single_value else TSD[int, TS[HttpResponse]]
}
final_output_type = TSB[ts_schema(**kwargs)]
else:
final_output_type = TS[HttpResponse] if single_value else TSD[int, TS[HttpResponse]]
# If inputs or outputs are not standard, then we use the graph, otherwise we can wire up?
url = f"{url}/?(.*)" # Create a URL that can respond to list, post and appropriate /<id> requests
if single_value:
@http_server_handler(url=url)
@with_signature(
kwargs={k: v for k, v in fn.signature.non_injectable_or_auto_resolvable_inputs.items() if k != "request"},
return_annotation=TS[HttpResponse],
)
def rest_handler_graph(request: TS[HttpRequest], **kwargs) -> TS[HttpResponse]:
rest_request = convert[TS[RestRequest]](request, value_type=data_type)
response = fn(request=rest_request, **kwargs)
rest_response = convert[TS[HttpResponse]](response)
return rest_response
else:
@http_server_handler(url=url)
@with_signature(
kwargs={k: v for k, v in fn.signature.non_injectable_or_auto_resolvable_inputs.items() if k != "request"},
return_annotation=final_output_type,
)
def rest_handler_graph(request: TSD[int, TS[HttpRequest]], **kwargs) -> final_output_type:
rest_requests = map_(lambda request: convert[TS[RestRequest]](request, value_type=data_type), request)
responses = fn(request=rest_requests, **kwargs)
if is_tsb:
rest_reponses = map_(convert[TS[HttpResponse]], responses.response)
return final_output_type.from_ts(
response=rest_reponses, **{k: v for k, v in responses.as_dict().items() if k != "response"}
)
else:
return map_(convert[TS[HttpResponse]], responses)
return rest_handler_graph
@operator
def _convert_to_rest_request(ts: TS[HttpRequest], cs_tp: type[COMPOUND_SCALAR] = None) -> TS[RestRequest]:
return nothing[TS[RestRequest]]()
@compute_node(overloads=_convert_to_rest_request)
def _(ts: TS[HttpGetRequest], cs_tp: type[COMPOUND_SCALAR] = None) -> TS[RestRequest]:
value = ts.value
if value.url_parsed_args and value.url_parsed_args[0]:
return RestReadRequest(url=value.url, id=value.url_parsed_args[0])
else:
return RestListRequest(url=value.url)
@dataclass(frozen=True)
class RestIdValueReqResp(CompoundScalar, Generic[COMPOUND_SCALAR]):
id: str
value: COMPOUND_SCALAR
@graph(overloads=_convert_to_rest_request)
def _(ts: TS[HttpPostRequest], cs_tp: type[COMPOUND_SCALAR] = None) -> TS[RestRequest]:
# A POST should imply create new
request = from_json[TS[RestIdValueReqResp[cs_tp]]](ts.body)
url = ts.url
id_ = request.id
value_ = request.value
return convert[TS[RestRequest]](combine[TS[RestCreateRequest[cs_tp]]](url=url, id=id_, value=value_))
@graph(overloads=_convert_to_rest_request)
def _(ts: TS[HttpPutRequest], cs_tp: type[COMPOUND_SCALAR] = None) -> TS[RestRequest]:
return convert[TS[RestRequest]](
combine[TS[RestUpdateRequest[cs_tp]]](
url=ts.url, id=ts.url_parsed_args[0], value=from_json[TS[cs_tp]](ts.body)
),
)
@graph(overloads=_convert_to_rest_request)
def _(ts: TS[HttpDeleteRequest], cs_tp: type[COMPOUND_SCALAR] = None) -> TS[RestRequest]:
return convert[TS[RestRequest]](combine[TS[RestDeleteRequest]](url=ts.url, id=ts.url_parsed_args[0]))
@graph(overloads=convert)
def convert_to_rest_request(
ts: TS[HttpRequest],
to: type[OUT] = OUT,
value_type: type[COMPOUND_SCALAR] = None,
) -> TS[RestRequest]:
return dispatch_(_convert_to_rest_request, ts=ts, cs_tp=value_type)
def _process_response_error(value: HttpResponse) -> tuple[RestResultEnum, str | None]:
status = RestResultEnum(value.status_code)
if status not in (RestResultEnum.OK, RestResultEnum.CREATED):
reason = json.loads(value.body).get("reason", "No Reason Provided")
return status, reason
else:
return status, None
@compute_node(overloads=convert)
def convert_to_rest_list_response(
ts: TS[HttpResponse],
to: type[TS[RestListResponse]] = OUT,
) -> TS[RestListResponse]:
value: HttpResponse = ts.value
status, reason = _process_response_error(value)
if reason:
return RestListResponse(status=status, reason=reason)
else:
ids = tuple(ids_ := json.loads(value.body))
if not isinstance(ids_, (tuple, list)):
return RestListResponse(status=RestResultEnum.BAD_REQUEST, reason="Invalid response body")
return RestListResponse(status=status, ids=ids)
def _extract_id_value_rest_response(
tp: type[REST_RESPONSE], cs_tp: type[COMPOUND_SCALAR], value: HttpResponse
) -> RestResponse:
status, reason = _process_response_error(value)
if reason:
return tp(status=status, reason=reason)
else:
value_ = json.loads(value.body)
v = from_json_builder(RestIdValueReqResp[cs_tp])(value_)
return tp(status=status, id=v.id, value=v.value)
@compute_node(overloads=convert)
def convert_to_rest_read_response(
ts: TS[HttpResponse],
to: type[TS[RestReadResponse[COMPOUND_SCALAR]]] = OUT,
_cs_tp: type[COMPOUND_SCALAR] = AUTO_RESOLVE,
) -> TS[RestReadResponse[COMPOUND_SCALAR]]:
value: HttpResponse = ts.value
return _extract_id_value_rest_response(RestReadResponse[_cs_tp], _cs_tp, value)
@compute_node(overloads=convert)
def convert_to_rest_create_response(
ts: TS[HttpResponse],
to: type[TS[RestCreateResponse[COMPOUND_SCALAR]]] = OUT,
_cs_tp: type[COMPOUND_SCALAR] = AUTO_RESOLVE,
) -> TS[RestCreateResponse[COMPOUND_SCALAR]]:
value: HttpResponse = ts.value
return _extract_id_value_rest_response(RestCreateResponse[_cs_tp], _cs_tp, value)
@compute_node(overloads=convert)
def convert_to_rest_update_response(
ts: TS[HttpResponse],
to: type[TS[RestUpdateResponse[COMPOUND_SCALAR]]] = OUT,
_cs_tp: type[COMPOUND_SCALAR] = AUTO_RESOLVE,
) -> TS[RestUpdateResponse[COMPOUND_SCALAR]]:
value: HttpResponse = ts.value
return _extract_id_value_rest_response(RestUpdateResponse[_cs_tp], _cs_tp, value)
@compute_node(overloads=convert)
def convert_to_rest_delete_response(
ts: TS[HttpResponse],
to: type[TS[RestDeleteResponse]] = OUT,
) -> TS[RestDeleteResponse]:
value: HttpResponse = ts.value
status, reason = _process_response_error(value)
if reason:
return RestDeleteResponse(status=status, reason=reason)
else:
return RestDeleteResponse(status=status)
@compute_node(overloads=convert)
def convert_from_rest_response(
ts: TS[REST_RESPONSE],
to: type[TS[HttpResponse]] = OUT,
) -> TS[HttpResponse]:
value: RestResponse = ts.value
if value.status not in (RestResultEnum.OK, RestResultEnum.CREATED):
body = f'{{ "reason": "{value.reason}" }}'.encode()
elif isinstance(value, RestListResponse):
values = (f'"{v}"' for v in value.ids)
body = f'[ {", ".join(values)} ]'.encode()
elif isinstance(value, (RestCreateResponse, RestUpdateResponse, RestReadResponse)):
v = value.value
body = f'{{ "id": "{value.id}", "value": {to_json_builder(type(v))(v)} }}'.encode()
else:
body = b""
return HttpResponse(
status_code=value.status.value,
headers=frozendict({"Content-Type": "application/json"}),
body=body,
)