Coverage for src/prisma/engine/_http.py: 91%

104 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-04-28 15:17 +0000

1from __future__ import annotations 

2 

3import json 

4import logging 

5from typing import Any, NoReturn 

6from datetime import timedelta 

7from typing_extensions import override 

8 

9import httpx 

10 

11from . import utils, errors 

12from ..utils import is_dict 

13from .._types import Method 

14from ._abstract import SyncAbstractEngine, AsyncAbstractEngine 

15from .._sync_http import SyncHTTP 

16from .._async_http import AsyncHTTP 

17from ..http_abstract import AbstractResponse 

18 

19log: logging.Logger = logging.getLogger(__name__) 

20 

21 

22class BaseHTTPEngine: 

23 """Engine wrapper that communicates to the underlying engine over HTTP""" 

24 

25 url: str | None 

26 headers: dict[str, str] 

27 

28 def __init__( 

29 self, 

30 *, 

31 url: str | None, 

32 headers: dict[str, str] | None = None, 

33 ) -> None: 

34 super().__init__() 

35 self.url = url 

36 self.headers = headers if headers is not None else {} 

37 

38 def _build_request( 

39 self, 

40 *, 

41 path: str, 

42 method: Method, 

43 content: Any, 

44 headers: dict[str, str] | None, 

45 parse_response: bool, 

46 ) -> tuple[str, dict[str, Any]]: 

47 if self.url is None: 47 ↛ 48line 47 didn't jump to line 48, because the condition on line 47 was never true

48 raise errors.NotConnectedError('Not connected to the query engine') 

49 

50 kwargs = { 

51 'headers': { 

52 **self.headers, 

53 } 

54 } 

55 

56 if parse_response: 

57 kwargs['headers']['Accept'] = 'application/json' 

58 

59 if headers is not None: 

60 kwargs['headers'].update(headers) 

61 

62 if content is not None: 

63 kwargs['content'] = content 

64 

65 url = self.url + path 

66 log.debug('Constructed %s request to %s', method, url) 

67 log.debug('Request headers: %s', kwargs['headers']) 

68 log.debug('Request content: %s', content) 

69 

70 return url, kwargs 

71 

72 def _process_response_data( 

73 self, 

74 *, 

75 data: object, 

76 response: AbstractResponse[httpx.Response], 

77 ) -> Any: 

78 if isinstance(data, str): 78 ↛ 80line 78 didn't jump to line 80, because the condition on line 78 was never true

79 # workaround for https://github.com/prisma/prisma-engines/pull/4246 

80 data = json.loads(data) 

81 

82 if not is_dict(data): 82 ↛ 83line 82 didn't jump to line 83, because the condition on line 82 was never true

83 raise TypeError(f'Expected deserialised engine response to be a dictionary, got {type(data)} - {data}') 

84 

85 errors_data = data.get('errors') 

86 if errors_data: 

87 return utils.handle_response_errors(response, errors_data) 

88 

89 return data 

90 

91 def _process_response_error( 

92 self, 

93 *, 

94 body: str, 

95 response: AbstractResponse[httpx.Response], 

96 ) -> NoReturn: 

97 if response.status == 422: 97 ↛ 101line 97 didn't jump to line 101, because the condition on line 97 was never false

98 raise errors.UnprocessableEntityError(response) 

99 

100 # TODO: handle errors better 

101 raise errors.EngineRequestError(response, body) 

102 

103 

104class SyncHTTPEngine(BaseHTTPEngine, SyncAbstractEngine): 

105 session: SyncHTTP 

106 

107 def __init__( 

108 self, 

109 url: str | None, 

110 headers: dict[str, str] | None = None, 

111 **kwargs: Any, 

112 ) -> None: 

113 super().__init__(url=url, headers=headers) 

114 self.session = SyncHTTP(**kwargs) 

115 

116 @override 

117 def close( 

118 self, 

119 *, 

120 timeout: timedelta | None = None, # noqa: ARG002 

121 ) -> None: 

122 self._close_session() 

123 

124 @override 

125 async def aclose(self, *, timeout: timedelta | None = None) -> None: 

126 pass 

127 

128 def _close_session(self) -> None: 

129 if self.session and not self.session.closed: 

130 self.session.close() 

131 

132 # TODO: improve return types 

133 def request( 

134 self, 

135 method: Method, 

136 path: str, 

137 *, 

138 content: Any = None, 

139 headers: dict[str, str] | None = None, 

140 parse_response: bool = True, 

141 ) -> Any: 

142 url, kwargs = self._build_request( 

143 path=path, 

144 method=method, 

145 content=content, 

146 headers=headers, 

147 parse_response=parse_response, 

148 ) 

149 

150 response = self.session.request(method, url, **kwargs) 

151 log.debug('%s %s returned status %s', method, url, response.status) 

152 

153 if 300 > response.status >= 200: 

154 # In certain cases we just want to return the response content as-is. 

155 # 

156 # This is useful for metrics which can be returned in a Prometheus format 

157 # which is incompatible with JSON. 

158 if not parse_response: 

159 text = response.text() 

160 log.debug('%s %s returned text: %s', method, url, text) 

161 return text 

162 

163 data = response.json() 

164 log.debug('%s %s returned %s', method, url, data) 

165 

166 return self._process_response_data(data=data, response=response) 

167 

168 self._process_response_error(body=response.text(), response=response) 

169 

170 

171class AsyncHTTPEngine(BaseHTTPEngine, AsyncAbstractEngine): 

172 session: AsyncHTTP 

173 

174 def __init__( 

175 self, 

176 url: str | None, 

177 headers: dict[str, str] | None = None, 

178 **kwargs: Any, 

179 ) -> None: 

180 super().__init__(url=url, headers=headers) 

181 self.session = AsyncHTTP(**kwargs) 

182 

183 @override 

184 def close(self, *, timeout: timedelta | None = None) -> None: 

185 pass 

186 

187 @override 

188 async def aclose( 

189 self, 

190 *, 

191 timeout: timedelta | None = None, # noqa: ARG002 

192 ) -> None: 

193 await self._close_session() 

194 

195 async def _close_session(self) -> None: 

196 if self.session and not self.session.closed: 

197 await self.session.close() 

198 

199 # TODO: improve return types 

200 async def request( 

201 self, 

202 method: Method, 

203 path: str, 

204 *, 

205 content: Any = None, 

206 headers: dict[str, str] | None = None, 

207 parse_response: bool = True, 

208 ) -> Any: 

209 url, kwargs = self._build_request( 

210 path=path, 

211 method=method, 

212 content=content, 

213 headers=headers, 

214 parse_response=parse_response, 

215 ) 

216 

217 response = await self.session.request(method, url, **kwargs) 

218 log.debug('%s %s returned status %s', method, url, response.status) 

219 

220 if 300 > response.status >= 200: 

221 # In certain cases we just want to return the response content as-is. 

222 # 

223 # This is useful for metrics which can be returned in a Prometheus format 

224 # which is incompatible with JSON. 

225 if not parse_response: 

226 text = await response.text() 

227 log.debug('%s %s returned text: %s', method, url, text) 

228 return text 

229 

230 data = await response.json() 

231 log.debug('%s %s returned %s', method, url, data) 

232 

233 return self._process_response_data(data=data, response=response) 

234 

235 self._process_response_error(body=await response.text(), response=response)