Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/build_events/service.py: 61.54%

52 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-10-04 17:48 +0000

1# Copyright (C) 2021 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17PublishBuildEvent service 

18========================= 

19 

20""" 

21 

22 

23from typing import Iterable, Iterator 

24 

25import grpc 

26from google.protobuf.empty_pb2 import Empty 

27 

28from buildgrid._protos.buildgrid.v2.query_build_events_pb2 import DESCRIPTOR as QBE_DESCRIPTOR 

29from buildgrid._protos.buildgrid.v2.query_build_events_pb2 import QueryEventStreamsRequest, QueryEventStreamsResponse 

30from buildgrid._protos.buildgrid.v2.query_build_events_pb2_grpc import ( 

31 QueryBuildEventsServicer, 

32 add_QueryBuildEventsServicer_to_server, 

33) 

34from buildgrid._protos.google.devtools.build.v1.build_events_pb2 import BuildEvent 

35from buildgrid._protos.google.devtools.build.v1.publish_build_event_pb2 import DESCRIPTOR as PBE_DESCRIPTOR 

36from buildgrid._protos.google.devtools.build.v1.publish_build_event_pb2 import ( 

37 OrderedBuildEvent, 

38 PublishBuildToolEventStreamRequest, 

39 PublishBuildToolEventStreamResponse, 

40 PublishLifecycleEventRequest, 

41) 

42from buildgrid._protos.google.devtools.build.v1.publish_build_event_pb2_grpc import ( 

43 PublishBuildEventServicer, 

44 add_PublishBuildEventServicer_to_server, 

45) 

46from buildgrid.server.build_events.storage import BuildEventStream, BuildEventStreamStorage 

47from buildgrid.server.decorators import rpc 

48from buildgrid.server.logging import buildgrid_logger 

49from buildgrid.server.servicer import InstancedServicer 

50 

51LOGGER = buildgrid_logger(__name__) 

52 

53 

54def is_lifecycle_event(event: BuildEvent) -> bool: 

55 lifecycle_events = [ 

56 "build_enqueued", 

57 "invocation_attempt_started", 

58 "invocation_attempt_finished", 

59 "build_finished", 

60 ] 

61 return event.WhichOneof("event") in lifecycle_events 

62 

63 

64class PublishBuildEventService(PublishBuildEventServicer, InstancedServicer[BuildEventStreamStorage]): 

65 SERVICE_NAME = "PublishBuildEvent" 

66 REGISTER_METHOD = add_PublishBuildEventServicer_to_server 

67 FULL_NAME = PBE_DESCRIPTOR.services_by_name[SERVICE_NAME].full_name 

68 

69 @rpc() 

70 def PublishLifecycleEvent(self, request: PublishLifecycleEventRequest, context: grpc.ServicerContext) -> Empty: 

71 """Handler for PublishLifecycleEvent requests. 

72 

73 This method takes a request containing a build lifecycle event, and 

74 uses it to update the high-level state of a build (with a corresponding) 

75 event stream. 

76 

77 """ 

78 ordered_build_event = request.build_event 

79 if is_lifecycle_event(ordered_build_event.event): 

80 stream = self._get_stream_for_event(ordered_build_event) 

81 stream.publish_event(ordered_build_event) 

82 

83 else: 

84 LOGGER.warning("Got a build tool event in a PublishLifecycleEvent request.") 

85 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

86 

87 return Empty() 

88 

89 @rpc() 

90 def PublishBuildToolEventStream( 

91 self, 

92 request_iterator: Iterable[PublishBuildToolEventStreamRequest], 

93 context: grpc.ServicerContext, 

94 ) -> Iterator[PublishBuildToolEventStreamResponse]: 

95 for request in request_iterator: 

96 LOGGER.info("Got a BuildToolEvent on the stream.") 

97 

98 # We don't need to give any special treatment to BuildToolEvents, so 

99 # just call the underlying `get_stream` method here. 

100 stream = self.get_instance("").get_stream(request.ordered_build_event.stream_id) 

101 

102 # `stream` should never be `None`, but in case the internals change 

103 # in future lets be safe. 

104 if stream is not None: 

105 stream.publish_event(request.ordered_build_event) 

106 

107 yield PublishBuildToolEventStreamResponse( 

108 stream_id=request.ordered_build_event.stream_id, 

109 sequence_number=request.ordered_build_event.sequence_number, 

110 ) 

111 

112 def _get_stream_for_event(self, event: OrderedBuildEvent) -> BuildEventStream: 

113 # If this is the start of a new build, then we want a new stream 

114 if event.event.WhichOneof("event") == "build_enqueued": 

115 return self.get_instance("").new_stream(event.stream_id) 

116 return self.get_instance("").get_stream(event.stream_id) 

117 

118 

119class QueryBuildEventsService(QueryBuildEventsServicer, InstancedServicer[BuildEventStreamStorage]): 

120 SERVICE_NAME = "QueryBuildEvents" 

121 REGISTER_METHOD = add_QueryBuildEventsServicer_to_server 

122 FULL_NAME = QBE_DESCRIPTOR.services_by_name[SERVICE_NAME].full_name 

123 

124 @rpc() 

125 def QueryEventStreams( 

126 self, request: QueryEventStreamsRequest, context: grpc.ServicerContext 

127 ) -> QueryEventStreamsResponse: 

128 streams = self.get_instance("").get_matching_streams(stream_key_regex=request.build_id_pattern) 

129 return QueryEventStreamsResponse(streams=[stream.to_grpc_message() for stream in streams])