Coverage for src/prisma/engine/_http.py: 91%
104 statements
« prev ^ index » next coverage.py v7.2.7, created at 2024-08-27 18:25 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2024-08-27 18:25 +0000
1from __future__ import annotations
3import json
4import logging
5from typing import Any, NoReturn
6from datetime import timedelta
7from typing_extensions import override
9import httpx
11from . import utils, errors
12from ..utils import is_dict
13from .._types import Method
14from ._abstract import SyncAbstractEngine, AsyncAbstractEngine
15from .._sync_http import SyncHTTP
16from .._async_http import AsyncHTTP
17from ..http_abstract import AbstractResponse
19log: logging.Logger = logging.getLogger(__name__)
22class BaseHTTPEngine:
23 """Engine wrapper that communicates to the underlying engine over HTTP"""
25 url: str | None
26 headers: dict[str, str]
28 def __init__(
29 self,
30 *,
31 url: str | None,
32 headers: dict[str, str] | None = None,
33 ) -> None:
34 super().__init__()
35 self.url = url
36 self.headers = headers if headers is not None else {}
38 def _build_request(
39 self,
40 *,
41 path: str,
42 method: Method,
43 content: Any,
44 headers: dict[str, str] | None,
45 parse_response: bool,
46 ) -> tuple[str, dict[str, Any]]:
47 if self.url is None: 47 ↛ 48line 47 didn't jump to line 48, because the condition on line 47 was never true
48 raise errors.NotConnectedError('Not connected to the query engine')
50 kwargs = {
51 'headers': {
52 **self.headers,
53 }
54 }
56 if parse_response:
57 kwargs['headers']['Accept'] = 'application/json'
59 if headers is not None:
60 kwargs['headers'].update(headers)
62 if content is not None:
63 kwargs['content'] = content
65 url = self.url + path
66 log.debug('Constructed %s request to %s', method, url)
67 log.debug('Request headers: %s', kwargs['headers'])
68 log.debug('Request content: %s', content)
70 return url, kwargs
72 def _process_response_data(
73 self,
74 *,
75 data: object,
76 response: AbstractResponse[httpx.Response],
77 ) -> Any:
78 if isinstance(data, str): 78 ↛ 80line 78 didn't jump to line 80, because the condition on line 78 was never true
79 # workaround for https://github.com/prisma/prisma-engines/pull/4246
80 data = json.loads(data)
82 if not is_dict(data): 82 ↛ 83line 82 didn't jump to line 83, because the condition on line 82 was never true
83 raise TypeError(f'Expected deserialised engine response to be a dictionary, got {type(data)} - {data}')
85 errors_data = data.get('errors')
86 if errors_data:
87 return utils.handle_response_errors(response, errors_data)
89 return data
91 def _process_response_error(
92 self,
93 *,
94 body: str,
95 response: AbstractResponse[httpx.Response],
96 ) -> NoReturn:
97 if response.status == 422: 97 ↛ 101line 97 didn't jump to line 101, because the condition on line 97 was never false
98 raise errors.UnprocessableEntityError(response)
100 # TODO: handle errors better
101 raise errors.EngineRequestError(response, body)
104class SyncHTTPEngine(BaseHTTPEngine, SyncAbstractEngine):
105 session: SyncHTTP
107 def __init__(
108 self,
109 url: str | None,
110 headers: dict[str, str] | None = None,
111 **kwargs: Any,
112 ) -> None:
113 super().__init__(url=url, headers=headers)
114 self.session = SyncHTTP(**kwargs)
116 @override
117 def close(
118 self,
119 *,
120 timeout: timedelta | None = None, # noqa: ARG002
121 ) -> None:
122 self._close_session()
124 @override
125 async def aclose(self, *, timeout: timedelta | None = None) -> None:
126 pass
128 def _close_session(self) -> None:
129 if self.session and not self.session.closed:
130 self.session.close()
132 # TODO: improve return types
133 def request(
134 self,
135 method: Method,
136 path: str,
137 *,
138 content: Any = None,
139 headers: dict[str, str] | None = None,
140 parse_response: bool = True,
141 ) -> Any:
142 url, kwargs = self._build_request(
143 path=path,
144 method=method,
145 content=content,
146 headers=headers,
147 parse_response=parse_response,
148 )
150 response = self.session.request(method, url, **kwargs)
151 log.debug('%s %s returned status %s', method, url, response.status)
153 if 300 > response.status >= 200:
154 # In certain cases we just want to return the response content as-is.
155 #
156 # This is useful for metrics which can be returned in a Prometheus format
157 # which is incompatible with JSON.
158 if not parse_response:
159 text = response.text()
160 log.debug('%s %s returned text: %s', method, url, text)
161 return text
163 data = response.json()
164 log.debug('%s %s returned %s', method, url, data)
166 return self._process_response_data(data=data, response=response)
168 self._process_response_error(body=response.text(), response=response)
171class AsyncHTTPEngine(BaseHTTPEngine, AsyncAbstractEngine):
172 session: AsyncHTTP
174 def __init__(
175 self,
176 url: str | None,
177 headers: dict[str, str] | None = None,
178 **kwargs: Any,
179 ) -> None:
180 super().__init__(url=url, headers=headers)
181 self.session = AsyncHTTP(**kwargs)
183 @override
184 def close(self, *, timeout: timedelta | None = None) -> None:
185 pass
187 @override
188 async def aclose(
189 self,
190 *,
191 timeout: timedelta | None = None, # noqa: ARG002
192 ) -> None:
193 await self._close_session()
195 async def _close_session(self) -> None:
196 if self.session and not self.session.closed:
197 await self.session.close()
199 # TODO: improve return types
200 async def request(
201 self,
202 method: Method,
203 path: str,
204 *,
205 content: Any = None,
206 headers: dict[str, str] | None = None,
207 parse_response: bool = True,
208 ) -> Any:
209 url, kwargs = self._build_request(
210 path=path,
211 method=method,
212 content=content,
213 headers=headers,
214 parse_response=parse_response,
215 )
217 response = await self.session.request(method, url, **kwargs)
218 log.debug('%s %s returned status %s', method, url, response.status)
220 if 300 > response.status >= 200:
221 # In certain cases we just want to return the response content as-is.
222 #
223 # This is useful for metrics which can be returned in a Prometheus format
224 # which is incompatible with JSON.
225 if not parse_response:
226 text = await response.text()
227 log.debug('%s %s returned text: %s', method, url, text)
228 return text
230 data = await response.json()
231 log.debug('%s %s returned %s', method, url, data)
233 return self._process_response_data(data=data, response=response)
235 self._process_response_error(body=await response.text(), response=response)