Coverage for databases/tests/test_transactions.py: 100%

82 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-08-27 18:25 +0000

1import asyncio 

2from typing import Optional 

3from datetime import timedelta 

4 

5import pytest 

6 

7import prisma 

8from prisma import Prisma 

9from prisma.models import User, Profile 

10 

11from ..utils import CURRENT_DATABASE 

12 

13 

14@pytest.mark.asyncio 

15async def test_model_query(client: Prisma) -> None: 

16 """Basic usage within model queries""" 

17 async with client.tx(timeout=timedelta(milliseconds=1000)) as tx: 

18 user = await User.prisma(tx).create({'name': 'Robert'}) 

19 assert user.name == 'Robert' 

20 

21 # ensure not commited outside transaction 

22 assert await client.user.count() == 0 

23 

24 await Profile.prisma(tx).create( 

25 { 

26 'description': 'Hello, there!', 

27 'country': 'Scotland', 

28 'user': { 

29 'connect': { 

30 'id': user.id, 

31 }, 

32 }, 

33 }, 

34 ) 

35 

36 found = await client.user.find_unique(where={'id': user.id}, include={'profile': True}) 

37 assert found is not None 

38 assert found.name == 'Robert' 

39 assert found.profile is not None 

40 assert found.profile.description == 'Hello, there!' 

41 

42 

43@pytest.mark.asyncio 

44async def test_context_manager(client: Prisma) -> None: 

45 """Basic usage within a context manager""" 

46 async with client.tx(timeout=timedelta(milliseconds=1000)) as transaction: 

47 user = await transaction.user.create({'name': 'Robert'}) 

48 assert user.name == 'Robert' 

49 

50 # ensure not commited outside transaction 

51 assert await client.user.count() == 0 

52 

53 await transaction.profile.create( 

54 { 

55 'description': 'Hello, there!', 

56 'country': 'Scotland', 

57 'user': { 

58 'connect': { 

59 'id': user.id, 

60 }, 

61 }, 

62 }, 

63 ) 

64 

65 found = await client.user.find_unique(where={'id': user.id}, include={'profile': True}) 

66 assert found is not None 

67 assert found.name == 'Robert' 

68 assert found.profile is not None 

69 assert found.profile.description == 'Hello, there!' 

70 

71 

72@pytest.mark.asyncio 

73async def test_context_manager_auto_rollback(client: Prisma) -> None: 

74 """An error being thrown when within a context manager causes the transaction to be rolled back""" 

75 user: Optional[User] = None 

76 

77 with pytest.raises(RuntimeError) as exc: 

78 async with client.tx() as tx: 

79 user = await tx.user.create({'name': 'Tegan'}) 

80 raise RuntimeError('Error ocurred mid transaction.') 

81 

82 assert exc.match('Error ocurred mid transaction.') 

83 

84 assert user is not None 

85 found = await client.user.find_unique(where={'id': user.id}) 

86 assert found is None 

87 

88 

89@pytest.mark.asyncio 

90async def test_batch_within_transaction(client: Prisma) -> None: 

91 """Query batching can be used within transactions""" 

92 async with client.tx(timeout=timedelta(milliseconds=10000)) as transaction: 

93 async with transaction.batch_() as batcher: 

94 batcher.user.create({'name': 'Tegan'}) 

95 batcher.user.create({'name': 'Robert'}) 

96 

97 assert await client.user.count() == 0 

98 assert await transaction.user.count() == 2 

99 

100 assert await client.user.count() == 2 

101 

102 

103@pytest.mark.asyncio 

104async def test_timeout(client: Prisma) -> None: 

105 """A `TransactionExpiredError` is raised when the transaction times out.""" 

106 # this outer block is necessary becuse to the context manager it appears that no error 

107 # ocurred so it will attempt to commit the transaction, triggering the expired error again 

108 with pytest.raises(prisma.errors.TransactionExpiredError): 

109 async with client.tx(timeout=timedelta(milliseconds=50)) as transaction: 

110 await asyncio.sleep(0.1) 

111 

112 with pytest.raises(prisma.errors.TransactionExpiredError) as exc: 

113 await transaction.user.create({'name': 'Robert'}) 

114 

115 raise exc.value 

116 

117 

118@pytest.mark.asyncio 

119@pytest.mark.skipif(CURRENT_DATABASE == 'sqlite', reason='SQLite does not support concurrent writes') 

120async def test_concurrent_transactions(client: Prisma) -> None: 

121 """Two separate transactions can be used independently of each other at the same time""" 

122 timeout = timedelta(milliseconds=15000) 

123 async with client.tx(timeout=timeout) as tx1, client.tx(timeout=timeout) as tx2: 

124 user1 = await tx1.user.create({'name': 'Tegan'}) 

125 user2 = await tx2.user.create({'name': 'Robert'}) 

126 

127 assert await tx1.user.find_first(where={'name': 'Robert'}) is None 

128 assert await tx2.user.find_first(where={'name': 'Tegan'}) is None 

129 

130 found = await tx1.user.find_first(where={'name': 'Tegan'}) 

131 assert found is not None 

132 assert found.id == user1.id 

133 

134 found = await tx2.user.find_first(where={'name': 'Robert'}) 

135 assert found is not None 

136 assert found.id == user2.id 

137 

138 # ensure not leaked 

139 assert await client.user.count() == 0 

140 assert (await tx1.user.find_first(where={'name': user2.name})) is None 

141 assert (await tx2.user.find_first(where={'name': user1.name})) is None 

142 

143 assert await client.user.count() == 2 

144 

145 

146@pytest.mark.asyncio 

147async def test_transaction_raises_original_error(client: Prisma) -> None: 

148 """If an error is raised during the execution of the transaction, it is raised""" 

149 with pytest.raises(RuntimeError, match=r'Test error!'): 

150 async with client.tx(): 

151 raise RuntimeError('Test error!') 

152 

153 

154@pytest.mark.asyncio 

155@pytest.mark.skipif(CURRENT_DATABASE == 'sqlite', reason='SQLite does not support concurrent writes') 

156async def test_transaction_within_transaction_warning(client: Prisma) -> None: 

157 """A warning is raised if a transaction is started from another transaction client""" 

158 tx1 = await client.tx().start() 

159 with pytest.warns(UserWarning) as warnings: 

160 await tx1.tx().start() 

161 

162 assert len(warnings) == 1 

163 record = warnings[0] 

164 assert not isinstance(record.message, str) 

165 assert ( 

166 record.message.args[0] 

167 == 'The current client is already in a transaction. This can lead to surprising behaviour.' 

168 ) 

169 assert record.filename == __file__ 

170 

171 

172@pytest.mark.asyncio 

173@pytest.mark.skipif(CURRENT_DATABASE == 'sqlite', reason='SQLite does not support concurrent writes') 

174async def test_transaction_within_transaction_context_warning( 

175 client: Prisma, 

176) -> None: 

177 """A warning is raised if a transaction is started from another transaction client""" 

178 async with client.tx() as tx1: 

179 with pytest.warns(UserWarning) as warnings: 

180 async with tx1.tx(): 

181 pass 

182 

183 assert len(warnings) == 1 

184 record = warnings[0] 

185 assert not isinstance(record.message, str) 

186 assert ( 

187 record.message.args[0] 

188 == 'The current client is already in a transaction. This can lead to surprising behaviour.' 

189 ) 

190 assert record.filename == __file__ 

191 

192 

193@pytest.mark.asyncio 

194async def test_transaction_not_started(client: Prisma) -> None: 

195 """A `TransactionNotStartedError` is raised when attempting to call `commit()` or `rollback()` 

196 on a transaction that hasn't been started yet. 

197 """ 

198 tx = client.tx() 

199 

200 with pytest.raises(prisma.errors.TransactionNotStartedError): 

201 await tx.commit() 

202 

203 with pytest.raises(prisma.errors.TransactionNotStartedError): 

204 await tx.rollback() 

205 

206 

207@pytest.mark.asyncio 

208async def test_transaction_already_closed(client: Prisma) -> None: 

209 """Attempting to use a transaction outside of the context block raises an error""" 

210 async with client.tx() as transaction: 

211 pass 

212 

213 with pytest.raises(prisma.errors.TransactionExpiredError) as exc: 

214 await transaction.user.delete_many() 

215 

216 assert exc.match('Transaction already closed')