Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/decorators/instance.py: 100.00%

35 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2025-03-13 15:36 +0000

1# Copyright (C) 2024 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import functools 

16import inspect 

17import itertools 

18from typing import Any, Callable, Iterator, TypeVar, cast 

19 

20import grpc 

21 

22from buildgrid.server.context import instance_context 

23from buildgrid.server.decorators.errors import error_context 

24 

25Func = TypeVar("Func", bound=Callable) # type: ignore[type-arg] 

26 

27 

28def instanced(get_instance_name: Callable[[Any], str]) -> Callable[[Func], Func]: 

29 def _get_instance_name(context: grpc.ServicerContext, message: Any) -> str: 

30 with error_context(context, "Unexpected error when determining instance name"): 

31 return get_instance_name(message) 

32 

33 def decorator(f: Func) -> Func: 

34 @functools.wraps(f) 

35 def server_stream_wrapper(self: Any, message: Any, context: grpc.ServicerContext) -> Iterator[Any]: 

36 if isinstance(message, Iterator): 

37 # Pop the message out to get the instance from it, then and recreate the iterator. 

38 first_message = next(message) 

39 message = itertools.chain([first_message], message) 

40 instance_name = _get_instance_name(context, first_message) 

41 else: 

42 instance_name = _get_instance_name(context, message) 

43 

44 with instance_context(instance_name): 

45 yield from f(self, message, context) 

46 

47 @functools.wraps(f) 

48 def server_unary_wrapper(self: Any, message: Any, context: grpc.ServicerContext) -> Any: 

49 if isinstance(message, Iterator): 

50 # Pop the message out to get the instance from it, then and recreate the iterator. 

51 first_message = next(message) 

52 message = itertools.chain([first_message], message) 

53 instance_name = _get_instance_name(context, first_message) 

54 else: 

55 instance_name = _get_instance_name(context, message) 

56 

57 with instance_context(instance_name): 

58 return f(self, message, context) 

59 

60 if inspect.isgeneratorfunction(f): 

61 return cast(Func, server_stream_wrapper) 

62 return cast(Func, server_unary_wrapper) 

63 

64 return decorator