Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/build_events/storage.py: 46.15%

52 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-10-04 17:48 +0000

1# Copyright (C) 2021 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import re 

17from threading import RLock 

18from typing import Dict, Iterator, List, Optional 

19 

20from buildgrid._protos.buildgrid.v2.query_build_events_pb2 import QueryEventStreamsResponse 

21from buildgrid._protos.google.devtools.build.v1.build_events_pb2 import StreamId 

22from buildgrid._protos.google.devtools.build.v1.publish_build_event_pb2 import OrderedBuildEvent 

23from buildgrid.server.logging import buildgrid_logger 

24from buildgrid.server.servicer import Instance 

25 

26LOGGER = buildgrid_logger(__name__) 

27 

28 

29class DuplicateStreamError(Exception): 

30 """Error when encountering a name collision between event streams.""" 

31 

32 pass 

33 

34 

35class BuildEventStream: 

36 """Internal representation of a stream of OrderedBuildEvents. 

37 

38 This class provides in-memory storage of the events in a given Build Event 

39 Stream. Many of these Build Event Streams may relate to the same build, or 

40 even to the same invocation of a tool. 

41 

42 Args: 

43 stream_id (StreamId): A gRPC message defining the ID of this stream. 

44 

45 """ 

46 

47 def __init__(self, stream_id: StreamId): 

48 self._stream_id = stream_id 

49 self._events: List[OrderedBuildEvent] = [] 

50 LOGGER.debug("Created BuildEventStream.", tags=dict(stream_id=self._stream_id)) 

51 

52 def __len__(self) -> int: 

53 return len(self._events) 

54 

55 def publish_event(self, event: OrderedBuildEvent) -> None: 

56 """Publish an ``OrderedBuildEvent`` into the stream. 

57 

58 Args: 

59 event (OrderedBuildEvent): The event to publish to the stream. 

60 This is an ``OrderedBuildEvent`` message from the 

61 ``publish_build_event`` proto. 

62 """ 

63 self._events.append(event) 

64 LOGGER.debug("Stored BuildEvent in stream.", tags=dict(stream_id=self._stream_id)) 

65 

66 def query_events(self, query: Optional[str] = None) -> Iterator[OrderedBuildEvent]: 

67 """Query the contents of this stream. 

68 

69 Filter the contents of the event stream by some query, returning an 

70 iterator of matching OrderedBuildEvents. 

71 

72 .. note:: 

73 The filtering functionality of this method is currently not 

74 implemented, and the iterator returned contains all the events 

75 in the stream no matter what query is used. 

76 

77 Args: 

78 query (str): The filter string to use when querying events. 

79 """ 

80 # TODO(SotK): Implement some basic querying here 

81 if query is not None: 

82 raise NotImplementedError("Specifying a build events query is not supported yet.") 

83 yield from self._events 

84 

85 def to_grpc_message(self) -> QueryEventStreamsResponse.BuildEventStream: 

86 """Convert this object to a ``BuildEventStream`` gRPC message. 

87 

88 This method converts this internal event stream representation into 

89 a ``QueryEventStreamsResponse.BuildEventStream`` gRPC message, as 

90 defined in the ``query_build_events`` proto. 

91 

92 """ 

93 return QueryEventStreamsResponse.BuildEventStream(stream_id=self._stream_id, events=self._events) 

94 

95 

96class BuildEventStreamStorage(Instance): 

97 """In-memory storage of Build Event Streams. 

98 

99 This class stores a collection of Build Event Streams, and handles both 

100 creation of new streams and querying the streams which already exist 

101 based on their stream ID. Streams are stored in-memory and are lost on 

102 service restart, so shouldn't be relied on as a source of persistent 

103 data when using this storage class. 

104 

105 This class is similar to the ``.*Instance`` classes used by other 

106 BuildGrid services, in that it is instantiated by the config parser 

107 and used by a ``.*Service`` class instantiated by the server class. 

108 Unlike the other instance classes however, this class doesn't have an 

109 ``instance_name`` attribute due to the Build Events protos not having 

110 the concept of multiple instances. 

111 

112 """ 

113 

114 # Hack in a service name here to fit the pattern 

115 SERVICE_NAME = "BuildEvents" 

116 

117 def __init__(self) -> None: 

118 self._streams: Dict[str, BuildEventStream] = {} 

119 self._streams_lock = RLock() 

120 

121 def new_stream(self, stream_id: StreamId) -> BuildEventStream: 

122 """Create and return a new ``BuildEventStream`` with the given ID. 

123 

124 This method creates a new :class:`BuildEventStream` with the given 

125 ``StreamId``, and returns it. If a stream with that ID already exists 

126 in this ``BuildEventStreamStorage``, then a :class:`DuplicateStreamError` 

127 is raised. 

128 

129 Args: 

130 stream_id (StreamId): The gRPC StreamId message containing the 

131 ID of the stream to create. 

132 

133 """ 

134 key = self._get_stream_key(stream_id) 

135 with self._streams_lock: 

136 if key in self._streams: 

137 raise DuplicateStreamError(f"Stream with key {key} already exists.") 

138 

139 stream = BuildEventStream(stream_id) 

140 self._streams[key] = stream 

141 return stream 

142 

143 def get_stream(self, stream_id: StreamId) -> BuildEventStream: 

144 """Return a ``BuildEventStream`` with the given stream ID. 

145 

146 This method takes a stream ID, converts it to a stream key, and 

147 returns the stream with that key if one exists. 

148 

149 This method will create a new :class:`BuildEventStream` if one with 

150 the given ID doesn't exist. 

151 

152 Args: 

153 stream_id (StreamId): The StreamID message from an event to 

154 get the ``BuildEventStream`` for. 

155 

156 Returns: 

157 A ``BuildEventStream`` with the given ID, or None. 

158 """ 

159 key = self._get_stream_key(stream_id) 

160 with self._streams_lock: 

161 stream = self._streams.get(key) 

162 if stream is None: 

163 stream = self.new_stream(stream_id) 

164 return stream 

165 

166 def get_matching_streams(self, stream_key_regex: str) -> List[BuildEventStream]: 

167 """Return a list of event streams which match the given key pattern. 

168 

169 This method takes a regular expression as a string, and returns 

170 a list of :class:`BuildEventStream` objects whose stream key (based 

171 on the events' StreamId) matches the regex. 

172 

173 Args: 

174 stream_key_regex (str): A regular expression to use to find 

175 matching streams. 

176 

177 Returns: 

178 List of :class:`BuildEventStream` objects which have a key 

179 matching the given regex. 

180 

181 """ 

182 regex = re.compile(stream_key_regex) 

183 return [stream for key, stream in self._streams.items() if regex.search(key) is not None] 

184 

185 def _get_stream_key(self, stream_id: StreamId) -> str: 

186 return f"{stream_id.build_id}.{stream_id.component}.{stream_id.invocation_id}"