Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/build_events/storage.py: 46.15%
52 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
1# Copyright (C) 2021 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16import re
17from threading import RLock
18from typing import Dict, Iterator, List, Optional
20from buildgrid._protos.buildgrid.v2.query_build_events_pb2 import QueryEventStreamsResponse
21from buildgrid._protos.google.devtools.build.v1.build_events_pb2 import StreamId
22from buildgrid._protos.google.devtools.build.v1.publish_build_event_pb2 import OrderedBuildEvent
23from buildgrid.server.logging import buildgrid_logger
24from buildgrid.server.servicer import Instance
26LOGGER = buildgrid_logger(__name__)
29class DuplicateStreamError(Exception):
30 """Error when encountering a name collision between event streams."""
32 pass
35class BuildEventStream:
36 """Internal representation of a stream of OrderedBuildEvents.
38 This class provides in-memory storage of the events in a given Build Event
39 Stream. Many of these Build Event Streams may relate to the same build, or
40 even to the same invocation of a tool.
42 Args:
43 stream_id (StreamId): A gRPC message defining the ID of this stream.
45 """
47 def __init__(self, stream_id: StreamId):
48 self._stream_id = stream_id
49 self._events: List[OrderedBuildEvent] = []
50 LOGGER.debug("Created BuildEventStream.", tags=dict(stream_id=self._stream_id))
52 def __len__(self) -> int:
53 return len(self._events)
55 def publish_event(self, event: OrderedBuildEvent) -> None:
56 """Publish an ``OrderedBuildEvent`` into the stream.
58 Args:
59 event (OrderedBuildEvent): The event to publish to the stream.
60 This is an ``OrderedBuildEvent`` message from the
61 ``publish_build_event`` proto.
62 """
63 self._events.append(event)
64 LOGGER.debug("Stored BuildEvent in stream.", tags=dict(stream_id=self._stream_id))
66 def query_events(self, query: Optional[str] = None) -> Iterator[OrderedBuildEvent]:
67 """Query the contents of this stream.
69 Filter the contents of the event stream by some query, returning an
70 iterator of matching OrderedBuildEvents.
72 .. note::
73 The filtering functionality of this method is currently not
74 implemented, and the iterator returned contains all the events
75 in the stream no matter what query is used.
77 Args:
78 query (str): The filter string to use when querying events.
79 """
80 # TODO(SotK): Implement some basic querying here
81 if query is not None:
82 raise NotImplementedError("Specifying a build events query is not supported yet.")
83 yield from self._events
85 def to_grpc_message(self) -> QueryEventStreamsResponse.BuildEventStream:
86 """Convert this object to a ``BuildEventStream`` gRPC message.
88 This method converts this internal event stream representation into
89 a ``QueryEventStreamsResponse.BuildEventStream`` gRPC message, as
90 defined in the ``query_build_events`` proto.
92 """
93 return QueryEventStreamsResponse.BuildEventStream(stream_id=self._stream_id, events=self._events)
96class BuildEventStreamStorage(Instance):
97 """In-memory storage of Build Event Streams.
99 This class stores a collection of Build Event Streams, and handles both
100 creation of new streams and querying the streams which already exist
101 based on their stream ID. Streams are stored in-memory and are lost on
102 service restart, so shouldn't be relied on as a source of persistent
103 data when using this storage class.
105 This class is similar to the ``.*Instance`` classes used by other
106 BuildGrid services, in that it is instantiated by the config parser
107 and used by a ``.*Service`` class instantiated by the server class.
108 Unlike the other instance classes however, this class doesn't have an
109 ``instance_name`` attribute due to the Build Events protos not having
110 the concept of multiple instances.
112 """
114 # Hack in a service name here to fit the pattern
115 SERVICE_NAME = "BuildEvents"
117 def __init__(self) -> None:
118 self._streams: Dict[str, BuildEventStream] = {}
119 self._streams_lock = RLock()
121 def new_stream(self, stream_id: StreamId) -> BuildEventStream:
122 """Create and return a new ``BuildEventStream`` with the given ID.
124 This method creates a new :class:`BuildEventStream` with the given
125 ``StreamId``, and returns it. If a stream with that ID already exists
126 in this ``BuildEventStreamStorage``, then a :class:`DuplicateStreamError`
127 is raised.
129 Args:
130 stream_id (StreamId): The gRPC StreamId message containing the
131 ID of the stream to create.
133 """
134 key = self._get_stream_key(stream_id)
135 with self._streams_lock:
136 if key in self._streams:
137 raise DuplicateStreamError(f"Stream with key {key} already exists.")
139 stream = BuildEventStream(stream_id)
140 self._streams[key] = stream
141 return stream
143 def get_stream(self, stream_id: StreamId) -> BuildEventStream:
144 """Return a ``BuildEventStream`` with the given stream ID.
146 This method takes a stream ID, converts it to a stream key, and
147 returns the stream with that key if one exists.
149 This method will create a new :class:`BuildEventStream` if one with
150 the given ID doesn't exist.
152 Args:
153 stream_id (StreamId): The StreamID message from an event to
154 get the ``BuildEventStream`` for.
156 Returns:
157 A ``BuildEventStream`` with the given ID, or None.
158 """
159 key = self._get_stream_key(stream_id)
160 with self._streams_lock:
161 stream = self._streams.get(key)
162 if stream is None:
163 stream = self.new_stream(stream_id)
164 return stream
166 def get_matching_streams(self, stream_key_regex: str) -> List[BuildEventStream]:
167 """Return a list of event streams which match the given key pattern.
169 This method takes a regular expression as a string, and returns
170 a list of :class:`BuildEventStream` objects whose stream key (based
171 on the events' StreamId) matches the regex.
173 Args:
174 stream_key_regex (str): A regular expression to use to find
175 matching streams.
177 Returns:
178 List of :class:`BuildEventStream` objects which have a key
179 matching the given regex.
181 """
182 regex = re.compile(stream_key_regex)
183 return [stream for key, stream in self._streams.items() if regex.search(key) is not None]
185 def _get_stream_key(self, stream_id: StreamId) -> str:
186 return f"{stream_id.build_id}.{stream_id.component}.{stream_id.invocation_id}"