Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/build_events/service.py: 61.54%
52 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
1# Copyright (C) 2021 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17PublishBuildEvent service
18=========================
20"""
23from typing import Iterable, Iterator
25import grpc
26from google.protobuf.empty_pb2 import Empty
28from buildgrid._protos.buildgrid.v2.query_build_events_pb2 import DESCRIPTOR as QBE_DESCRIPTOR
29from buildgrid._protos.buildgrid.v2.query_build_events_pb2 import QueryEventStreamsRequest, QueryEventStreamsResponse
30from buildgrid._protos.buildgrid.v2.query_build_events_pb2_grpc import (
31 QueryBuildEventsServicer,
32 add_QueryBuildEventsServicer_to_server,
33)
34from buildgrid._protos.google.devtools.build.v1.build_events_pb2 import BuildEvent
35from buildgrid._protos.google.devtools.build.v1.publish_build_event_pb2 import DESCRIPTOR as PBE_DESCRIPTOR
36from buildgrid._protos.google.devtools.build.v1.publish_build_event_pb2 import (
37 OrderedBuildEvent,
38 PublishBuildToolEventStreamRequest,
39 PublishBuildToolEventStreamResponse,
40 PublishLifecycleEventRequest,
41)
42from buildgrid._protos.google.devtools.build.v1.publish_build_event_pb2_grpc import (
43 PublishBuildEventServicer,
44 add_PublishBuildEventServicer_to_server,
45)
46from buildgrid.server.build_events.storage import BuildEventStream, BuildEventStreamStorage
47from buildgrid.server.decorators import rpc
48from buildgrid.server.logging import buildgrid_logger
49from buildgrid.server.servicer import InstancedServicer
51LOGGER = buildgrid_logger(__name__)
54def is_lifecycle_event(event: BuildEvent) -> bool:
55 lifecycle_events = [
56 "build_enqueued",
57 "invocation_attempt_started",
58 "invocation_attempt_finished",
59 "build_finished",
60 ]
61 return event.WhichOneof("event") in lifecycle_events
64class PublishBuildEventService(PublishBuildEventServicer, InstancedServicer[BuildEventStreamStorage]):
65 SERVICE_NAME = "PublishBuildEvent"
66 REGISTER_METHOD = add_PublishBuildEventServicer_to_server
67 FULL_NAME = PBE_DESCRIPTOR.services_by_name[SERVICE_NAME].full_name
69 @rpc()
70 def PublishLifecycleEvent(self, request: PublishLifecycleEventRequest, context: grpc.ServicerContext) -> Empty:
71 """Handler for PublishLifecycleEvent requests.
73 This method takes a request containing a build lifecycle event, and
74 uses it to update the high-level state of a build (with a corresponding)
75 event stream.
77 """
78 ordered_build_event = request.build_event
79 if is_lifecycle_event(ordered_build_event.event):
80 stream = self._get_stream_for_event(ordered_build_event)
81 stream.publish_event(ordered_build_event)
83 else:
84 LOGGER.warning("Got a build tool event in a PublishLifecycleEvent request.")
85 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
87 return Empty()
89 @rpc()
90 def PublishBuildToolEventStream(
91 self,
92 request_iterator: Iterable[PublishBuildToolEventStreamRequest],
93 context: grpc.ServicerContext,
94 ) -> Iterator[PublishBuildToolEventStreamResponse]:
95 for request in request_iterator:
96 LOGGER.info("Got a BuildToolEvent on the stream.")
98 # We don't need to give any special treatment to BuildToolEvents, so
99 # just call the underlying `get_stream` method here.
100 stream = self.get_instance("").get_stream(request.ordered_build_event.stream_id)
102 # `stream` should never be `None`, but in case the internals change
103 # in future lets be safe.
104 if stream is not None:
105 stream.publish_event(request.ordered_build_event)
107 yield PublishBuildToolEventStreamResponse(
108 stream_id=request.ordered_build_event.stream_id,
109 sequence_number=request.ordered_build_event.sequence_number,
110 )
112 def _get_stream_for_event(self, event: OrderedBuildEvent) -> BuildEventStream:
113 # If this is the start of a new build, then we want a new stream
114 if event.event.WhichOneof("event") == "build_enqueued":
115 return self.get_instance("").new_stream(event.stream_id)
116 return self.get_instance("").get_stream(event.stream_id)
119class QueryBuildEventsService(QueryBuildEventsServicer, InstancedServicer[BuildEventStreamStorage]):
120 SERVICE_NAME = "QueryBuildEvents"
121 REGISTER_METHOD = add_QueryBuildEventsServicer_to_server
122 FULL_NAME = QBE_DESCRIPTOR.services_by_name[SERVICE_NAME].full_name
124 @rpc()
125 def QueryEventStreams(
126 self, request: QueryEventStreamsRequest, context: grpc.ServicerContext
127 ) -> QueryEventStreamsResponse:
128 streams = self.get_instance("").get_matching_streams(stream_key_regex=request.build_id_pattern)
129 return QueryEventStreamsResponse(streams=[stream.to_grpc_message() for stream in streams])