Coverage for src/prisma/utils.py: 83%

67 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-04-28 15:17 +0000

1from __future__ import annotations 

2 

3import os 

4import time 

5import asyncio 

6import inspect 

7import logging 

8import warnings 

9import contextlib 

10from typing import TYPE_CHECKING, Any, Dict, Union, TypeVar, Iterator, NoReturn, Coroutine 

11from importlib.util import find_spec 

12 

13from ._types import CoroType, FuncType, TypeGuard 

14 

15if TYPE_CHECKING: 

16 from typing_extensions import TypeGuard 

17 

18_T = TypeVar('_T') 

19 

20 

21def _env_bool(key: str) -> bool: 

22 return os.environ.get(key, '').lower() in {'1', 't', 'true'} 

23 

24 

25DEBUG = _env_bool('PRISMA_PY_DEBUG') 

26DEBUG_GENERATOR = _env_bool('PRISMA_PY_DEBUG_GENERATOR') 

27 

28 

29class _NoneType: # pyright: ignore[reportUnusedClass] 

30 def __bool__(self) -> bool: 

31 return False 

32 

33 

34def time_since(start: float, precision: int = 4) -> str: 

35 # TODO: prettier output 

36 delta = round(time.monotonic() - start, precision) 

37 return f'{delta}s' 

38 

39 

40def setup_logging() -> None: 

41 if DEBUG: 

42 logging.getLogger('prisma').setLevel(logging.DEBUG) 

43 

44 

45def maybe_async_run( 

46 func: Union[FuncType, CoroType], 

47 *args: Any, 

48 **kwargs: Any, 

49) -> object: 

50 if is_coroutine(func): 

51 return async_run(func(*args, **kwargs)) 

52 return func(*args, **kwargs) 

53 

54 

55def async_run(coro: Coroutine[Any, Any, _T]) -> _T: 

56 """Execute the coroutine and return the result.""" 

57 return get_or_create_event_loop().run_until_complete(coro) 

58 

59 

60def is_coroutine(obj: Any) -> TypeGuard[CoroType]: 

61 return asyncio.iscoroutinefunction(obj) or inspect.isgeneratorfunction(obj) 

62 

63 

64def module_exists(name: str) -> bool: 

65 return find_spec(name) is not None 

66 

67 

68@contextlib.contextmanager 

69def temp_env_update(env: Dict[str, str]) -> Iterator[None]: 

70 old = os.environ.copy() 

71 

72 try: 

73 os.environ.update(env) 

74 yield 

75 finally: 

76 for key in env: 

77 os.environ.pop(key, None) 

78 

79 os.environ.update(old) 

80 

81 

82@contextlib.contextmanager 

83def monkeypatch(obj: Any, attr: str, new: Any) -> Any: 

84 """Temporarily replace a method with a new funtion 

85 

86 The previously set method is passed as the first argument to the new function 

87 """ 

88 

89 def patched(*args: Any, **kwargs: Any) -> Any: 

90 return new(old, *args, **kwargs) 

91 

92 old = getattr(obj, attr) 

93 

94 try: 

95 setattr(obj, attr, patched) 

96 yield 

97 finally: 

98 setattr(obj, attr, old) 

99 

100 

101def get_or_create_event_loop() -> asyncio.AbstractEventLoop: 

102 """Return the currently set event loop or create a new event loop if there 

103 is no set event loop. 

104 

105 Starting from python3.10, asyncio.get_event_loop() raises a DeprecationWarning 

106 when there is no event loop set, this deprecation will be enforced starting from 

107 python3.12 

108 

109 This function serves as a future-proof wrapper over asyncio.get_event_loop() 

110 that preserves the old behaviour. 

111 """ 

112 with warnings.catch_warnings(): 

113 warnings.filterwarnings('ignore', category=DeprecationWarning) 

114 

115 try: 

116 return asyncio.get_event_loop() 

117 except RuntimeError: 

118 loop = asyncio.new_event_loop() 

119 asyncio.set_event_loop(loop) 

120 return loop 

121 

122 

123def assert_never(value: NoReturn) -> NoReturn: 

124 """Used by type checkers for exhaustive match cases. 

125 

126 https://github.com/microsoft/pyright/issues/767 

127 """ 

128 raise AssertionError('Unhandled type: {}'.format(type(value).__name__)) # pragma: no cover 

129 

130 

131def make_optional(value: _T) -> _T | None: 

132 """Helper function for type checkers to change the given type to include None. 

133 

134 This is useful in cases where you do not have an explicit type for a symbol (e.g. modules) 

135 but want to mark it as potentially None. 

136 """ 

137 return value 

138 

139 

140def is_dict(obj: object) -> TypeGuard[dict[object, object]]: 

141 return isinstance(obj, dict)