Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/build_events/service.py: 59.65%

57 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-06-11 15:37 +0000

1# Copyright (C) 2021 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17PublishBuildEvent service 

18========================= 

19 

20""" 

21 

22 

23import logging 

24from typing import Iterable, Iterator 

25 

26import grpc 

27from google.protobuf.empty_pb2 import Empty 

28 

29from buildgrid._protos.buildgrid.v2.query_build_events_pb2 import DESCRIPTOR as QBE_DESCRIPTOR 

30from buildgrid._protos.buildgrid.v2.query_build_events_pb2 import QueryEventStreamsRequest, QueryEventStreamsResponse 

31from buildgrid._protos.buildgrid.v2.query_build_events_pb2_grpc import ( 

32 QueryBuildEventsServicer, 

33 add_QueryBuildEventsServicer_to_server, 

34) 

35from buildgrid._protos.google.devtools.build.v1.build_events_pb2 import BuildEvent 

36from buildgrid._protos.google.devtools.build.v1.publish_build_event_pb2 import DESCRIPTOR as PBE_DESCRIPTOR 

37from buildgrid._protos.google.devtools.build.v1.publish_build_event_pb2 import ( 

38 OrderedBuildEvent, 

39 PublishBuildToolEventStreamRequest, 

40 PublishBuildToolEventStreamResponse, 

41 PublishLifecycleEventRequest, 

42) 

43from buildgrid._protos.google.devtools.build.v1.publish_build_event_pb2_grpc import ( 

44 PublishBuildEventServicer, 

45 add_PublishBuildEventServicer_to_server, 

46) 

47from buildgrid.server.build_events.storage import BuildEventStream, BuildEventStreamStorage 

48from buildgrid.server.metrics_names import ( 

49 BUILD_EVENTS_PUBLISH_BUILD_TOOL_EVENT_STREAM_EXCEPTION_COUNT_METRIC_NAME, 

50 BUILD_EVENTS_PUBLISH_LIFE_CYCLE_EVENT_EXCEPTION_COUNT_METRIC_NAME, 

51 BUILD_EVENTS_QUERY_EVENT_STREAMS_EXCEPTION_COUNT_METRIC_NAME, 

52) 

53from buildgrid.server.metrics_utils import ExceptionCounter, generator_method_exception_counter 

54from buildgrid.server.servicer import InstancedServicer 

55from buildgrid.server.utils.decorators import track_request_id 

56 

57LOGGER = logging.getLogger(__name__) 

58 

59 

60def is_lifecycle_event(event: BuildEvent) -> bool: 

61 lifecycle_events = [ 

62 "build_enqueued", 

63 "invocation_attempt_started", 

64 "invocation_attempt_finished", 

65 "build_finished", 

66 ] 

67 return event.WhichOneof("event") in lifecycle_events 

68 

69 

70class PublishBuildEventService(PublishBuildEventServicer, InstancedServicer[BuildEventStreamStorage]): 

71 REGISTER_METHOD = add_PublishBuildEventServicer_to_server 

72 FULL_NAME = PBE_DESCRIPTOR.services_by_name["PublishBuildEvent"].full_name 

73 

74 @ExceptionCounter(BUILD_EVENTS_PUBLISH_LIFE_CYCLE_EVENT_EXCEPTION_COUNT_METRIC_NAME) 

75 def PublishLifecycleEvent(self, request: PublishLifecycleEventRequest, context: grpc.ServicerContext) -> Empty: 

76 """Handler for PublishLifecycleEvent requests. 

77 

78 This method takes a request containing a build lifecycle event, and 

79 uses it to update the high-level state of a build (with a corresponding) 

80 event stream. 

81 

82 """ 

83 LOGGER.info("Received a PublishLifecycleEvent request") 

84 ordered_build_event = request.build_event 

85 if is_lifecycle_event(ordered_build_event.event): 

86 stream = self._get_stream_for_event(ordered_build_event) 

87 stream.publish_event(ordered_build_event) 

88 

89 else: 

90 LOGGER.warning("Got a build tool event in a PublishLifecycleEvent request") 

91 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

92 

93 return Empty() 

94 

95 @track_request_id 

96 @generator_method_exception_counter(BUILD_EVENTS_PUBLISH_BUILD_TOOL_EVENT_STREAM_EXCEPTION_COUNT_METRIC_NAME) 

97 def PublishBuildToolEventStream( 

98 self, 

99 request_iterator: Iterable[PublishBuildToolEventStreamRequest], 

100 context: grpc.ServicerContext, 

101 ) -> Iterator[PublishBuildToolEventStreamResponse]: 

102 LOGGER.info("Opened a BuildToolEvent stream") 

103 

104 for request in request_iterator: 

105 LOGGER.info("Got a BuildToolEvent on the stream") 

106 

107 # We don't need to give any special treatment to BuildToolEvents, so 

108 # just call the underlying `get_stream` method here. 

109 stream = self.get_instance("").get_stream(request.ordered_build_event.stream_id) 

110 

111 # `stream` should never be `None`, but in case the internals change 

112 # in future lets be safe. 

113 if stream is not None: 

114 stream.publish_event(request.ordered_build_event) 

115 

116 yield PublishBuildToolEventStreamResponse( 

117 stream_id=request.ordered_build_event.stream_id, 

118 sequence_number=request.ordered_build_event.sequence_number, 

119 ) 

120 

121 def _get_stream_for_event(self, event: OrderedBuildEvent) -> BuildEventStream: 

122 # If this is the start of a new build, then we want a new stream 

123 if event.event.WhichOneof("event") == "build_enqueued": 

124 return self.get_instance("").new_stream(event.stream_id) 

125 return self.get_instance("").get_stream(event.stream_id) 

126 

127 

128class QueryBuildEventsService(QueryBuildEventsServicer, InstancedServicer[BuildEventStreamStorage]): 

129 REGISTER_METHOD = add_QueryBuildEventsServicer_to_server 

130 FULL_NAME = QBE_DESCRIPTOR.services_by_name["QueryBuildEvents"].full_name 

131 

132 @track_request_id 

133 @ExceptionCounter(BUILD_EVENTS_QUERY_EVENT_STREAMS_EXCEPTION_COUNT_METRIC_NAME) 

134 def QueryEventStreams( 

135 self, request: QueryEventStreamsRequest, context: grpc.ServicerContext 

136 ) -> QueryEventStreamsResponse: 

137 LOGGER.info("Got a QueryEventStreams request") 

138 

139 streams = self.get_instance("").get_matching_streams(stream_key_regex=request.build_id_pattern) 

140 return QueryEventStreamsResponse(streams=[stream.to_grpc_message() for stream in streams])