Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/build_events/service.py: 39.22%

51 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-22 21:04 +0000

1# Copyright (C) 2021 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17PublishBuildEvent service 

18========================= 

19 

20""" 

21 

22 

23import logging 

24from typing import Iterable 

25 

26from google.protobuf.empty_pb2 import Empty 

27import grpc 

28 

29from buildgrid._protos.buildgrid.v2.query_build_events_pb2 import ( 

30 QueryEventStreamsRequest, 

31 QueryEventStreamsResponse 

32) 

33from buildgrid._protos.buildgrid.v2.query_build_events_pb2_grpc import ( 

34 QueryBuildEventsServicer, 

35 add_QueryBuildEventsServicer_to_server 

36) 

37from buildgrid._protos.google.devtools.build.v1.build_events_pb2 import ( 

38 BuildEvent 

39) 

40from buildgrid._protos.google.devtools.build.v1.publish_build_event_pb2_grpc import ( 

41 PublishBuildEventServicer, 

42 add_PublishBuildEventServicer_to_server 

43) 

44from buildgrid._protos.google.devtools.build.v1.publish_build_event_pb2 import ( 

45 OrderedBuildEvent, 

46 PublishBuildToolEventStreamRequest, 

47 PublishBuildToolEventStreamResponse, 

48 PublishLifecycleEventRequest 

49) 

50from buildgrid.server.build_events.storage import BuildEventStream, BuildEventStreamStorage 

51 

52 

53def is_lifecycle_event(event: BuildEvent): 

54 lifecycle_events = [ 

55 "build_enqueued", 

56 "invocation_attempt_started", 

57 "invocation_attempt_finished", 

58 "build_finished" 

59 ] 

60 return event.WhichOneof("event") in lifecycle_events 

61 

62 

63class PublishBuildEventService(PublishBuildEventServicer): 

64 

65 """PublishBuildEvent service implementation.""" 

66 

67 def __init__( 

68 self, 

69 server: grpc.Server, 

70 stream_storage: BuildEventStreamStorage 

71 ): 

72 self._logger = logging.getLogger(__name__) 

73 self._streams = stream_storage 

74 

75 add_PublishBuildEventServicer_to_server(self, server) 

76 self._logger.info("Created PublishBuildEventService") 

77 

78 def PublishLifecycleEvent( 

79 self, 

80 request: PublishLifecycleEventRequest, 

81 context: grpc.ServicerContext 

82 ) -> Empty: 

83 """Handler for PublishLifecycleEvent requests. 

84 

85 This method takes a request containing a build lifecycle event, and 

86 uses it to update the high-level state of a build (with a corresponding) 

87 event stream. 

88 

89 """ 

90 self._logger.info("Received a PublishLifecycleEvent request") 

91 ordered_build_event = request.build_event 

92 if is_lifecycle_event(ordered_build_event.event): 

93 stream = self._get_stream_for_event(ordered_build_event) 

94 stream.publish_event(ordered_build_event) 

95 

96 else: 

97 self._logger.warning("Got a build tool event in a PublishLifecycleEvent request") 

98 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

99 

100 return Empty() 

101 

102 def PublishBuildToolEventStream( 

103 self, 

104 request_iterator: Iterable[PublishBuildToolEventStreamRequest], 

105 context: grpc.ServicerContext 

106 ) -> Iterable[PublishBuildToolEventStreamResponse]: 

107 self._logger.info("Opened a BuildToolEvent stream") 

108 

109 for request in request_iterator: 

110 self._logger.info("Got a BuildToolEvent on the stream") 

111 

112 # We don't need to give any special treatment to BuildToolEvents, so 

113 # just call the underlying `get_stream` method here. 

114 stream = self._streams.get_stream(request.ordered_build_event.stream_id) 

115 

116 # `stream` should never be `None`, but in case the internals change 

117 # in future lets be safe. 

118 if stream is not None: 

119 stream.publish_event(request.ordered_build_event) 

120 

121 yield PublishBuildToolEventStreamResponse( 

122 stream_id=request.ordered_build_event.stream_id, 

123 sequence_number=request.ordered_build_event.sequence_number 

124 ) 

125 

126 def _get_stream_for_event(self, event: OrderedBuildEvent) -> BuildEventStream: 

127 # If this is the start of a new build, then we want a new stream 

128 if event.event.WhichOneof("event") == "build_enqueued": 

129 return self._streams.new_stream(event.stream_id) 

130 

131 return self._streams.get_stream(event.stream_id) 

132 

133 

134class QueryBuildEventsService(QueryBuildEventsServicer): 

135 

136 def __init__( 

137 self, 

138 server: grpc.Server, 

139 stream_storage: BuildEventStreamStorage 

140 ): 

141 self._logger = logging.getLogger(__name__) 

142 self._streams = stream_storage 

143 add_QueryBuildEventsServicer_to_server(self, server) 

144 self._logger.info("Created QueryBuildEventsService") 

145 

146 def QueryEventStreams( 

147 self, 

148 request: QueryEventStreamsRequest, 

149 context: grpc.ServicerContext 

150 ) -> QueryEventStreamsResponse: 

151 self._logger.info("Got a QueryEventStreams request") 

152 

153 streams = self._streams.get_matching_streams( 

154 stream_key_regex=request.build_id_pattern) 

155 return QueryEventStreamsResponse( 

156 streams=[stream.to_grpc_message() for stream in streams] 

157 )