Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/build_events/storage.py: 42.59%

54 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-22 21:04 +0000

1# Copyright (C) 2021 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import logging 

17import re 

18from threading import RLock 

19from typing import Iterator, List, Optional, TYPE_CHECKING 

20 

21from buildgrid._protos.buildgrid.v2.query_build_events_pb2 import QueryEventStreamsResponse 

22from buildgrid._protos.google.devtools.build.v1.build_events_pb2 import StreamId 

23from buildgrid._protos.google.devtools.build.v1.publish_build_event_pb2 import OrderedBuildEvent 

24 

25if TYPE_CHECKING: 

26 from buildgrid.server.server import Server 

27 

28 

29class DuplicateStreamError(Exception): 

30 """Error when encountering a name collision between event streams.""" 

31 pass 

32 

33 

34class BuildEventStream: 

35 

36 """Internal representation of a stream of OrderedBuildEvents. 

37 

38 This class provides in-memory storage of the events in a given Build Event 

39 Stream. Many of these Build Event Streams may relate to the same build, or 

40 even to the same invocation of a tool. 

41 

42 Args: 

43 stream_id (StreamId): A gRPC message defining the ID of this stream. 

44 

45 """ 

46 

47 def __init__(self, stream_id: StreamId): 

48 self._logger = logging.getLogger(__name__) 

49 self._stream_id = stream_id 

50 self._events: List[OrderedBuildEvent] = [] 

51 self._logger.debug(f"Created BuildEventStream for [{self._stream_id}]") 

52 

53 def __len__(self) -> int: 

54 return len(self._events) 

55 

56 def publish_event(self, event: OrderedBuildEvent) -> None: 

57 """Publish an ``OrderedBuildEvent`` into the stream. 

58 

59 Args: 

60 event (OrderedBuildEvent): The event to publish to the stream. 

61 This is an ``OrderedBuildEvent`` message from the 

62 ``publish_build_event`` proto. 

63 """ 

64 self._events.append(event) 

65 self._logger.debug(f"Stored BuildEvent in stream [{self._stream_id}]") 

66 

67 def query_events(self, query: Optional[str]=None) -> Iterator[OrderedBuildEvent]: 

68 """Query the contents of this stream. 

69 

70 Filter the contents of the event stream by some query, returning an 

71 iterator of matching OrderedBuildEvents. 

72 

73 .. note:: 

74 The filtering functionality of this method is currently not 

75 implemented, and the iterator returned contains all the events 

76 in the stream no matter what query is used. 

77 

78 Args: 

79 query (str): The filter string to use when querying events. 

80 """ 

81 # TODO(SotK): Implement some basic querying here 

82 if query is not None: 

83 raise NotImplementedError( 

84 "Specifying a build events query is not supported yet.") 

85 yield from self._events 

86 

87 def to_grpc_message(self) -> QueryEventStreamsResponse.BuildEventStream: 

88 """Convert this object to a ``BuildEventStream`` gRPC message. 

89 

90 This method converts this internal event stream representation into 

91 a ``QueryEventStreamsResponse.BuildEventStream`` gRPC message, as 

92 defined in the ``query_build_events`` proto. 

93 

94 """ 

95 return QueryEventStreamsResponse.BuildEventStream( 

96 stream_id=self._stream_id, 

97 events=self._events 

98 ) 

99 

100 

101class BuildEventStreamStorage: 

102 

103 """In-memory storage of Build Event Streams. 

104 

105 This class stores a collection of Build Event Streams, and handles both 

106 creation of new streams and querying the streams which already exist 

107 based on their stream ID. Streams are stored in-memory and are lost on 

108 service restart, so shouldn't be relied on as a source of persistent 

109 data when using this storage class. 

110 

111 This class is similar to the ``.*Instance`` classes used by other 

112 BuildGrid services, in that it is instantiated by the config parser 

113 and used by a ``.*Service`` class instantiated by the server class. 

114 Unlike the other instance classes however, this class doesn't have an 

115 ``instance_name`` attribute due to the Build Events protos not having 

116 the concept of multiple instances. 

117 

118 """ 

119 

120 def __init__(self): 

121 self._streams = {} 

122 self._streams_lock = RLock() 

123 

124 def register_instance_with_server(self, instance_name: str, server: "Server"): 

125 """Register this BuildEventStreamStorage with the BuildGrid server 

126 

127 This method doesn't make a lot of sense here since this isn't really 

128 an "instance" in the same sense as other services have in BuildGrid. 

129 

130 The Build Events protocol doesn't have support for instance names, 

131 so this is the closest thing to an instance we have however. As 

132 such this method is implemented to avoid a special case in the 

133 config parsing. 

134 

135 The provided instance name is currently ignored. 

136 

137 Args: 

138 instance_name (str): The name of the instance this service is 

139 defined in. Currently ignored. 

140 server (Server): The BuildGrid server to register this storage 

141 backend with. 

142 

143 """ 

144 server.add_build_events_storage(self) 

145 

146 def new_stream(self, stream_id: StreamId) -> BuildEventStream: 

147 """Create and return a new ``BuildEventStream`` with the given ID. 

148 

149 This method creates a new :class:`BuildEventStream` with the given 

150 ``StreamId``, and returns it. If a stream with that ID already exists 

151 in this ``BuildEventStreamStorage``, then a :class:`DuplicateStreamError` 

152 is raised. 

153 

154 Args: 

155 stream_id (StreamId): The gRPC StreamId message containing the 

156 ID of the stream to create. 

157 

158 """ 

159 key = self._get_stream_key(stream_id) 

160 with self._streams_lock: 

161 if key in self._streams: 

162 raise DuplicateStreamError(f"Stream with key {key} already exists.") 

163 

164 stream = BuildEventStream(stream_id) 

165 self._streams[key] = stream 

166 return stream 

167 

168 def get_stream(self, stream_id: StreamId) -> BuildEventStream: 

169 """Return a ``BuildEventStream`` with the given stream ID. 

170 

171 This method takes a stream ID, converts it to a stream key, and 

172 returns the stream with that key if one exists. 

173 

174 This method will create a new :class:`BuildEventStream` if one with 

175 the given ID doesn't exist. 

176 

177 Args: 

178 stream_id (StreamId): The StreamID message from an event to 

179 get the ``BuildEventStream`` for. 

180 

181 Returns: 

182 A ``BuildEventStream`` with the given ID, or None. 

183 """ 

184 key = self._get_stream_key(stream_id) 

185 with self._streams_lock: 

186 stream = self._streams.get(key) 

187 if stream is None: 

188 stream = self.new_stream(stream_id) 

189 return stream 

190 

191 def get_matching_streams(self, stream_key_regex: str) -> List[BuildEventStream]: 

192 """Return a list of event streams which match the given key pattern. 

193 

194 This method takes a regular expression as a string, and returns 

195 a list of :class:`BuildEventStream` objects whose stream key (based 

196 on the events' StreamId) matches the regex. 

197 

198 Args: 

199 stream_key_regex (str): A regular expression to use to find 

200 matching streams. 

201 

202 Returns: 

203 List of :class:`BuildEventStream` objects which have a key 

204 matching the given regex. 

205 

206 """ 

207 regex = re.compile(stream_key_regex) 

208 return [ 

209 stream for key, stream in self._streams.items() 

210 if regex.search(key) is not None 

211 ] 

212 

213 def _get_stream_key(self, stream_id: StreamId) -> str: 

214 return f"{stream_id.build_id}.{stream_id.component}.{stream_id.invocation_id}"