Coverage for src/prisma/engine/_abstract.py: 98%

55 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-04-28 15:17 +0000

1from __future__ import annotations 

2 

3from abc import ABC, abstractmethod 

4from typing import TYPE_CHECKING, Any, overload 

5from datetime import timedelta 

6from typing_extensions import Literal 

7 

8from .._types import TransactionId 

9from .._compat import get_running_loop 

10from .._constants import DEFAULT_CONNECT_TIMEOUT 

11 

12if TYPE_CHECKING: 

13 from ..types import MetricsFormat, DatasourceOverride # noqa: TID251 

14 

15 

16__all__ = ( 

17 'SyncAbstractEngine', 

18 'AsyncAbstractEngine', 

19) 

20 

21 

22class BaseAbstractEngine(ABC): 

23 dml: str 

24 

25 def stop(self, *, timeout: timedelta | None = None) -> None: 

26 """Wrapper for synchronously calling close() and aclose()""" 

27 self.close(timeout=timeout) 

28 try: 

29 loop = get_running_loop() 

30 except RuntimeError: 

31 # no event loop in the current thread, we cannot cleanup asynchronously 

32 return 

33 else: 

34 if not loop.is_closed(): 34 ↛ exitline 34 didn't return from function 'stop', because the condition on line 34 was never false

35 loop.create_task(self.aclose(timeout=timeout)) 

36 

37 @abstractmethod 

38 def close(self, *, timeout: timedelta | None = None) -> None: 

39 """Synchronous method for closing the engine, useful if the underlying engine uses a subprocess""" 

40 ... 

41 

42 # TODO(#871): don't include for the sync client 

43 @abstractmethod 

44 async def aclose(self, *, timeout: timedelta | None = None) -> None: 

45 """Asynchronous method for closing the engine, only used if an asynchronous client is generated""" 

46 ... 

47 

48 

49class SyncAbstractEngine(BaseAbstractEngine): 

50 @abstractmethod 

51 def connect( 

52 self, 

53 timeout: timedelta = DEFAULT_CONNECT_TIMEOUT, 

54 datasources: list[DatasourceOverride] | None = None, 

55 ) -> None: 

56 """Connect to the engine""" 

57 ... 

58 

59 @abstractmethod 

60 def query(self, content: str, *, tx_id: TransactionId | None) -> Any: 

61 """Execute a GraphQL query. 

62 

63 This method expects a JSON object matching this structure: 

64 

65 { 

66 'variables': {}, 

67 'operation_name': str, 

68 'query': str, 

69 } 

70 """ 

71 ... 

72 

73 @abstractmethod 

74 def start_transaction(self, *, content: str) -> TransactionId: 

75 """Start an interactive transaction, returns the transaction ID that can be used to perform subsequent operations""" 

76 ... 

77 

78 @abstractmethod 

79 def commit_transaction(self, tx_id: TransactionId) -> None: 

80 """Commit an interactive transaction, the given transaction will no longer be usable""" 

81 ... 

82 

83 @abstractmethod 

84 def rollback_transaction(self, tx_id: TransactionId) -> None: 

85 """Rollback an interactive transaction, the given transaction will no longer be usable""" 

86 ... 

87 

88 @overload 

89 @abstractmethod 

90 def metrics( 

91 self, 

92 *, 

93 format: Literal['json'], 

94 global_labels: dict[str, str] | None, 

95 ) -> dict[str, Any]: ... 

96 

97 @overload 

98 @abstractmethod 

99 def metrics( 

100 self, 

101 *, 

102 format: Literal['prometheus'], 

103 global_labels: dict[str, str] | None, 

104 ) -> str: ... 

105 

106 @abstractmethod 

107 def metrics( 

108 self, 

109 *, 

110 format: MetricsFormat, 

111 global_labels: dict[str, str] | None, 

112 ) -> str | dict[str, Any]: ... 

113 

114 

115class AsyncAbstractEngine(BaseAbstractEngine): 

116 @abstractmethod 

117 async def connect( 

118 self, 

119 timeout: timedelta = DEFAULT_CONNECT_TIMEOUT, 

120 datasources: list[DatasourceOverride] | None = None, 

121 ) -> None: 

122 """Connect to the engine""" 

123 ... 

124 

125 @abstractmethod 

126 async def query(self, content: str, *, tx_id: TransactionId | None) -> Any: 

127 """Execute a GraphQL query. 

128 

129 This method expects a JSON object matching this structure: 

130 

131 { 

132 'variables': {}, 

133 'operation_name': str, 

134 'query': str, 

135 } 

136 """ 

137 ... 

138 

139 @abstractmethod 

140 async def start_transaction(self, *, content: str) -> TransactionId: 

141 """Start an interactive transaction, returns the transaction ID that can be used to perform subsequent operations""" 

142 ... 

143 

144 @abstractmethod 

145 async def commit_transaction(self, tx_id: TransactionId) -> None: 

146 """Commit an interactive transaction, the given transaction will no longer be usable""" 

147 ... 

148 

149 @abstractmethod 

150 async def rollback_transaction(self, tx_id: TransactionId) -> None: 

151 """Rollback an interactive transaction, the given transaction will no longer be usable""" 

152 ... 

153 

154 @overload 

155 @abstractmethod 

156 async def metrics( 

157 self, 

158 *, 

159 format: Literal['json'], 

160 global_labels: dict[str, str] | None, 

161 ) -> dict[str, Any]: ... 

162 

163 @overload 

164 @abstractmethod 

165 async def metrics( 

166 self, 

167 *, 

168 format: Literal['prometheus'], 

169 global_labels: dict[str, str] | None, 

170 ) -> str: ... 

171 

172 @abstractmethod 

173 async def metrics( 

174 self, 

175 *, 

176 format: MetricsFormat, 

177 global_labels: dict[str, str] | None, 

178 ) -> str | dict[str, Any]: ...