Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/build_events/service.py: 61.54%
52 statements
« prev ^ index » next coverage.py v7.4.1, created at 2025-08-20 14:29 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2025-08-20 14:29 +0000
1# Copyright (C) 2021 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17PublishBuildEvent service
18=========================
20"""
23from typing import Iterable, Iterator
25import grpc
26from google.protobuf.empty_pb2 import Empty
28from buildgrid._protos.build.buildgrid.query_build_events_pb2 import DESCRIPTOR as QBE_DESCRIPTOR
29from buildgrid._protos.build.buildgrid.query_build_events_pb2 import (
30 QueryEventStreamsRequest,
31 QueryEventStreamsResponse,
32)
33from buildgrid._protos.build.buildgrid.query_build_events_pb2_grpc import (
34 QueryBuildEventsServicer,
35 add_QueryBuildEventsServicer_to_server,
36)
37from buildgrid._protos.google.devtools.build.v1.build_events_pb2 import BuildEvent
38from buildgrid._protos.google.devtools.build.v1.publish_build_event_pb2 import DESCRIPTOR as PBE_DESCRIPTOR
39from buildgrid._protos.google.devtools.build.v1.publish_build_event_pb2 import (
40 OrderedBuildEvent,
41 PublishBuildToolEventStreamRequest,
42 PublishBuildToolEventStreamResponse,
43 PublishLifecycleEventRequest,
44)
45from buildgrid._protos.google.devtools.build.v1.publish_build_event_pb2_grpc import (
46 PublishBuildEventServicer,
47 add_PublishBuildEventServicer_to_server,
48)
49from buildgrid.server.build_events.storage import BuildEventStream, BuildEventStreamStorage
50from buildgrid.server.decorators import rpc
51from buildgrid.server.logging import buildgrid_logger
52from buildgrid.server.servicer import InstancedServicer
54LOGGER = buildgrid_logger(__name__)
57def is_lifecycle_event(event: BuildEvent) -> bool:
58 lifecycle_events = [
59 "build_enqueued",
60 "invocation_attempt_started",
61 "invocation_attempt_finished",
62 "build_finished",
63 ]
64 return event.WhichOneof("event") in lifecycle_events
67class PublishBuildEventService(PublishBuildEventServicer, InstancedServicer[BuildEventStreamStorage]):
68 SERVICE_NAME = "PublishBuildEvent"
69 REGISTER_METHOD = add_PublishBuildEventServicer_to_server
70 FULL_NAME = PBE_DESCRIPTOR.services_by_name[SERVICE_NAME].full_name
72 @rpc()
73 def PublishLifecycleEvent(self, request: PublishLifecycleEventRequest, context: grpc.ServicerContext) -> Empty:
74 """Handler for PublishLifecycleEvent requests.
76 This method takes a request containing a build lifecycle event, and
77 uses it to update the high-level state of a build (with a corresponding)
78 event stream.
80 """
81 ordered_build_event = request.build_event
82 if is_lifecycle_event(ordered_build_event.event):
83 stream = self._get_stream_for_event(ordered_build_event)
84 stream.publish_event(ordered_build_event)
86 else:
87 LOGGER.warning("Got a build tool event in a PublishLifecycleEvent request.")
88 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
90 return Empty()
92 @rpc()
93 def PublishBuildToolEventStream(
94 self,
95 request_iterator: Iterable[PublishBuildToolEventStreamRequest],
96 context: grpc.ServicerContext,
97 ) -> Iterator[PublishBuildToolEventStreamResponse]:
98 for request in request_iterator:
99 LOGGER.info("Got a BuildToolEvent on the stream.")
101 # We don't need to give any special treatment to BuildToolEvents, so
102 # just call the underlying `get_stream` method here.
103 stream = self.get_instance("").get_stream(request.ordered_build_event.stream_id)
105 # `stream` should never be `None`, but in case the internals change
106 # in future lets be safe.
107 if stream is not None:
108 stream.publish_event(request.ordered_build_event)
110 yield PublishBuildToolEventStreamResponse(
111 stream_id=request.ordered_build_event.stream_id,
112 sequence_number=request.ordered_build_event.sequence_number,
113 )
115 def _get_stream_for_event(self, event: OrderedBuildEvent) -> BuildEventStream:
116 # If this is the start of a new build, then we want a new stream
117 if event.event.WhichOneof("event") == "build_enqueued":
118 return self.get_instance("").new_stream(event.stream_id)
119 return self.get_instance("").get_stream(event.stream_id)
122class QueryBuildEventsService(QueryBuildEventsServicer, InstancedServicer[BuildEventStreamStorage]):
123 SERVICE_NAME = "QueryBuildEvents"
124 REGISTER_METHOD = add_QueryBuildEventsServicer_to_server
125 FULL_NAME = QBE_DESCRIPTOR.services_by_name[SERVICE_NAME].full_name
127 @rpc()
128 def QueryEventStreams(
129 self, request: QueryEventStreamsRequest, context: grpc.ServicerContext
130 ) -> QueryEventStreamsResponse:
131 streams = self.get_instance("").get_matching_streams(stream_key_regex=request.build_id_pattern)
132 return QueryEventStreamsResponse(streams=[stream.to_grpc_message() for stream in streams])