Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/build_events/service.py: 61.54%

52 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2025-08-20 14:29 +0000

1# Copyright (C) 2021 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17PublishBuildEvent service 

18========================= 

19 

20""" 

21 

22 

23from typing import Iterable, Iterator 

24 

25import grpc 

26from google.protobuf.empty_pb2 import Empty 

27 

28from buildgrid._protos.build.buildgrid.query_build_events_pb2 import DESCRIPTOR as QBE_DESCRIPTOR 

29from buildgrid._protos.build.buildgrid.query_build_events_pb2 import ( 

30 QueryEventStreamsRequest, 

31 QueryEventStreamsResponse, 

32) 

33from buildgrid._protos.build.buildgrid.query_build_events_pb2_grpc import ( 

34 QueryBuildEventsServicer, 

35 add_QueryBuildEventsServicer_to_server, 

36) 

37from buildgrid._protos.google.devtools.build.v1.build_events_pb2 import BuildEvent 

38from buildgrid._protos.google.devtools.build.v1.publish_build_event_pb2 import DESCRIPTOR as PBE_DESCRIPTOR 

39from buildgrid._protos.google.devtools.build.v1.publish_build_event_pb2 import ( 

40 OrderedBuildEvent, 

41 PublishBuildToolEventStreamRequest, 

42 PublishBuildToolEventStreamResponse, 

43 PublishLifecycleEventRequest, 

44) 

45from buildgrid._protos.google.devtools.build.v1.publish_build_event_pb2_grpc import ( 

46 PublishBuildEventServicer, 

47 add_PublishBuildEventServicer_to_server, 

48) 

49from buildgrid.server.build_events.storage import BuildEventStream, BuildEventStreamStorage 

50from buildgrid.server.decorators import rpc 

51from buildgrid.server.logging import buildgrid_logger 

52from buildgrid.server.servicer import InstancedServicer 

53 

54LOGGER = buildgrid_logger(__name__) 

55 

56 

57def is_lifecycle_event(event: BuildEvent) -> bool: 

58 lifecycle_events = [ 

59 "build_enqueued", 

60 "invocation_attempt_started", 

61 "invocation_attempt_finished", 

62 "build_finished", 

63 ] 

64 return event.WhichOneof("event") in lifecycle_events 

65 

66 

67class PublishBuildEventService(PublishBuildEventServicer, InstancedServicer[BuildEventStreamStorage]): 

68 SERVICE_NAME = "PublishBuildEvent" 

69 REGISTER_METHOD = add_PublishBuildEventServicer_to_server 

70 FULL_NAME = PBE_DESCRIPTOR.services_by_name[SERVICE_NAME].full_name 

71 

72 @rpc() 

73 def PublishLifecycleEvent(self, request: PublishLifecycleEventRequest, context: grpc.ServicerContext) -> Empty: 

74 """Handler for PublishLifecycleEvent requests. 

75 

76 This method takes a request containing a build lifecycle event, and 

77 uses it to update the high-level state of a build (with a corresponding) 

78 event stream. 

79 

80 """ 

81 ordered_build_event = request.build_event 

82 if is_lifecycle_event(ordered_build_event.event): 

83 stream = self._get_stream_for_event(ordered_build_event) 

84 stream.publish_event(ordered_build_event) 

85 

86 else: 

87 LOGGER.warning("Got a build tool event in a PublishLifecycleEvent request.") 

88 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

89 

90 return Empty() 

91 

92 @rpc() 

93 def PublishBuildToolEventStream( 

94 self, 

95 request_iterator: Iterable[PublishBuildToolEventStreamRequest], 

96 context: grpc.ServicerContext, 

97 ) -> Iterator[PublishBuildToolEventStreamResponse]: 

98 for request in request_iterator: 

99 LOGGER.info("Got a BuildToolEvent on the stream.") 

100 

101 # We don't need to give any special treatment to BuildToolEvents, so 

102 # just call the underlying `get_stream` method here. 

103 stream = self.get_instance("").get_stream(request.ordered_build_event.stream_id) 

104 

105 # `stream` should never be `None`, but in case the internals change 

106 # in future lets be safe. 

107 if stream is not None: 

108 stream.publish_event(request.ordered_build_event) 

109 

110 yield PublishBuildToolEventStreamResponse( 

111 stream_id=request.ordered_build_event.stream_id, 

112 sequence_number=request.ordered_build_event.sequence_number, 

113 ) 

114 

115 def _get_stream_for_event(self, event: OrderedBuildEvent) -> BuildEventStream: 

116 # If this is the start of a new build, then we want a new stream 

117 if event.event.WhichOneof("event") == "build_enqueued": 

118 return self.get_instance("").new_stream(event.stream_id) 

119 return self.get_instance("").get_stream(event.stream_id) 

120 

121 

122class QueryBuildEventsService(QueryBuildEventsServicer, InstancedServicer[BuildEventStreamStorage]): 

123 SERVICE_NAME = "QueryBuildEvents" 

124 REGISTER_METHOD = add_QueryBuildEventsServicer_to_server 

125 FULL_NAME = QBE_DESCRIPTOR.services_by_name[SERVICE_NAME].full_name 

126 

127 @rpc() 

128 def QueryEventStreams( 

129 self, request: QueryEventStreamsRequest, context: grpc.ServicerContext 

130 ) -> QueryEventStreamsResponse: 

131 streams = self.get_instance("").get_matching_streams(stream_key_regex=request.build_id_pattern) 

132 return QueryEventStreamsResponse(streams=[stream.to_grpc_message() for stream in streams])