Coverage for src/prisma/engine/_abstract.py: 98%
55 statements
« prev ^ index » next coverage.py v7.2.7, created at 2024-08-27 18:25 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2024-08-27 18:25 +0000
1from __future__ import annotations
3from abc import ABC, abstractmethod
4from typing import TYPE_CHECKING, Any, overload
5from datetime import timedelta
6from typing_extensions import Literal
8from .._types import TransactionId
9from .._compat import get_running_loop
10from .._constants import DEFAULT_CONNECT_TIMEOUT
12if TYPE_CHECKING:
13 from ..types import MetricsFormat, DatasourceOverride # noqa: TID251
16__all__ = (
17 'SyncAbstractEngine',
18 'AsyncAbstractEngine',
19)
22class BaseAbstractEngine(ABC):
23 dml: str
25 def stop(self, *, timeout: timedelta | None = None) -> None:
26 """Wrapper for synchronously calling close() and aclose()"""
27 self.close(timeout=timeout)
28 try:
29 loop = get_running_loop()
30 except RuntimeError:
31 # no event loop in the current thread, we cannot cleanup asynchronously
32 return
33 else:
34 if not loop.is_closed(): 34 ↛ exitline 34 didn't return from function 'stop', because the condition on line 34 was never false
35 loop.create_task(self.aclose(timeout=timeout))
37 @abstractmethod
38 def close(self, *, timeout: timedelta | None = None) -> None:
39 """Synchronous method for closing the engine, useful if the underlying engine uses a subprocess"""
40 ...
42 # TODO(#871): don't include for the sync client
43 @abstractmethod
44 async def aclose(self, *, timeout: timedelta | None = None) -> None:
45 """Asynchronous method for closing the engine, only used if an asynchronous client is generated"""
46 ...
49class SyncAbstractEngine(BaseAbstractEngine):
50 @abstractmethod
51 def connect(
52 self,
53 timeout: timedelta = DEFAULT_CONNECT_TIMEOUT,
54 datasources: list[DatasourceOverride] | None = None,
55 ) -> None:
56 """Connect to the engine"""
57 ...
59 @abstractmethod
60 def query(self, content: str, *, tx_id: TransactionId | None) -> Any:
61 """Execute a GraphQL query.
63 This method expects a JSON object matching this structure:
65 {
66 'variables': {},
67 'operation_name': str,
68 'query': str,
69 }
70 """
71 ...
73 @abstractmethod
74 def start_transaction(self, *, content: str) -> TransactionId:
75 """Start an interactive transaction, returns the transaction ID that can be used to perform subsequent operations"""
76 ...
78 @abstractmethod
79 def commit_transaction(self, tx_id: TransactionId) -> None:
80 """Commit an interactive transaction, the given transaction will no longer be usable"""
81 ...
83 @abstractmethod
84 def rollback_transaction(self, tx_id: TransactionId) -> None:
85 """Rollback an interactive transaction, the given transaction will no longer be usable"""
86 ...
88 @overload
89 @abstractmethod
90 def metrics(
91 self,
92 *,
93 format: Literal['json'],
94 global_labels: dict[str, str] | None,
95 ) -> dict[str, Any]: ...
97 @overload
98 @abstractmethod
99 def metrics(
100 self,
101 *,
102 format: Literal['prometheus'],
103 global_labels: dict[str, str] | None,
104 ) -> str: ...
106 @abstractmethod
107 def metrics(
108 self,
109 *,
110 format: MetricsFormat,
111 global_labels: dict[str, str] | None,
112 ) -> str | dict[str, Any]: ...
115class AsyncAbstractEngine(BaseAbstractEngine):
116 @abstractmethod
117 async def connect(
118 self,
119 timeout: timedelta = DEFAULT_CONNECT_TIMEOUT,
120 datasources: list[DatasourceOverride] | None = None,
121 ) -> None:
122 """Connect to the engine"""
123 ...
125 @abstractmethod
126 async def query(self, content: str, *, tx_id: TransactionId | None) -> Any:
127 """Execute a GraphQL query.
129 This method expects a JSON object matching this structure:
131 {
132 'variables': {},
133 'operation_name': str,
134 'query': str,
135 }
136 """
137 ...
139 @abstractmethod
140 async def start_transaction(self, *, content: str) -> TransactionId:
141 """Start an interactive transaction, returns the transaction ID that can be used to perform subsequent operations"""
142 ...
144 @abstractmethod
145 async def commit_transaction(self, tx_id: TransactionId) -> None:
146 """Commit an interactive transaction, the given transaction will no longer be usable"""
147 ...
149 @abstractmethod
150 async def rollback_transaction(self, tx_id: TransactionId) -> None:
151 """Rollback an interactive transaction, the given transaction will no longer be usable"""
152 ...
154 @overload
155 @abstractmethod
156 async def metrics(
157 self,
158 *,
159 format: Literal['json'],
160 global_labels: dict[str, str] | None,
161 ) -> dict[str, Any]: ...
163 @overload
164 @abstractmethod
165 async def metrics(
166 self,
167 *,
168 format: Literal['prometheus'],
169 global_labels: dict[str, str] | None,
170 ) -> str: ...
172 @abstractmethod
173 async def metrics(
174 self,
175 *,
176 format: MetricsFormat,
177 global_labels: dict[str, str] | None,
178 ) -> str | dict[str, Any]: ...