Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# Copyright (C) 2020 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15from collections import OrderedDict 

16from itertools import islice 

17import logging 

18from threading import Event, Lock 

19from typing import Dict, Optional 

20 

21from buildgrid.settings import MAX_LOGSTREAMS_BEFORE_CLEANUP 

22from buildgrid._exceptions import NotFoundError, StreamAlreadyExistsError, StreamFinishedError, StreamWritePendingError 

23from buildgrid.server.cas.logstream.stream_storage.stream_storage_abc import StreamStorageABC, \ 

24 StreamHandle, StreamLength, StreamChunk 

25 

26 

27class _MemoryStream(): 

28 def __init__(self, stream_handle: StreamHandle): 

29 self._stream_handle = stream_handle 

30 self._stream_length = StreamLength(length=0, finished=False) 

31 

32 self._read_ready = Event() 

33 self._writer_lock = Lock() 

34 self._streamlength_snapshot_lock = Lock() 

35 self._data: bytearray = bytearray() 

36 

37 @property 

38 def finished(self): 

39 return self._stream_length.finished 

40 

41 @property 

42 def streamlength(self) -> StreamLength: 

43 with self._streamlength_snapshot_lock: 

44 return StreamLength(**self._stream_length._asdict()) 

45 

46 @property 

47 def name(self) -> str: 

48 return self._stream_handle.name 

49 

50 @property 

51 def write_name(self) -> str: 

52 return self._stream_handle.write_resource_name 

53 

54 def append(self, message: Optional[bytes] = None, mark_finished: Optional[bool] = False): 

55 if self.streamlength.finished: 

56 raise StreamFinishedError( 

57 f'The LogStream with the parent=[{self.write_name}] ' 

58 'has already been marked as finished, cannot append') 

59 

60 # Append data and update length/finished 

61 with self._writer_lock: 

62 if message: 

63 self._data.extend(message) 

64 with self._streamlength_snapshot_lock: 

65 new_length = self._stream_length.length 

66 if message: 

67 new_length += len(message) 

68 new_finished = False 

69 if mark_finished: 

70 new_finished = True 

71 

72 self._stream_length = StreamLength(new_length, new_finished) 

73 

74 # Inform anyone that was waiting on data to check 

75 self._read_ready.set() 

76 self._read_ready.clear() 

77 

78 def _get_chunk(self, 

79 streamlength_snapshot: StreamLength, 

80 offset: int, 

81 end_offset: Optional[int] = None) -> Optional[StreamChunk]: 

82 if streamlength_snapshot.length >= offset: 

83 # If we have a message starting at offset `offset`, return 

84 # message[offset:end_offset] 

85 # When `end_offset` > `stream_length`, returns message up to 

86 # `stream_length` 

87 data_chunk = bytes(self._data[offset:end_offset]) 

88 return StreamChunk(data_chunk, 

89 len(data_chunk), 

90 streamlength_snapshot.finished) 

91 elif streamlength_snapshot.finished: 

92 # If we the stream has finished and we didn't get enough bytes, 

93 # we are done! 

94 raise StreamFinishedError( 

95 f'The LogStream with the parent=[{self.write_name}] ' 

96 'has already been marked as finished and its length is ' 

97 f'length=[{streamlength_snapshot.length}], ' 

98 f'cannot read message at offset=[{offset}].') 

99 else: 

100 return None 

101 

102 def read_chunk(self, offset: int = 0, limit: Optional[int] = None) -> StreamChunk: 

103 end_offset = None 

104 if limit: 

105 end_offset = offset + limit 

106 

107 streamlength_snapshot = self.streamlength 

108 chunk = self._get_chunk(streamlength_snapshot, offset, end_offset) 

109 

110 if chunk: 

111 return chunk 

112 

113 raise StreamWritePendingError( 

114 f'Only length=[{streamlength_snapshot.length}] bytes ' 

115 f'were commited to the LogStream with the parent=[{self.write_name}] ' 

116 f' so far, writes starting at offset=[{offset}] are still pending.' 

117 ) 

118 

119 def read_chunk_blocking(self, 

120 offset: int = 0, 

121 limit: Optional[int] = None) -> StreamChunk: 

122 max_end_offset = None 

123 if limit: 

124 max_end_offset = offset + limit 

125 

126 streamlength_snapshot = self.streamlength 

127 give_up = False 

128 while not give_up: 

129 give_up = streamlength_snapshot.finished 

130 chunk = self._get_chunk(streamlength_snapshot, offset, max_end_offset) 

131 if chunk: 

132 return chunk 

133 if not self._read_ready.wait(timeout=600): 

134 break 

135 

136 return StreamChunk(data=bytes(), 

137 chunk_length=streamlength_snapshot.length, 

138 finished=streamlength_snapshot.finished) 

139 

140 

141class MemoryStreamStorage(StreamStorageABC): 

142 def __init__(self, max_streams_count: int = MAX_LOGSTREAMS_BEFORE_CLEANUP): 

143 self._logger = logging.getLogger(__name__) 

144 self._streams: Dict[str, _MemoryStream] = dict() 

145 self._streams_readers: OrderedDict[str, int] = OrderedDict() 

146 self._streams_readers_lock = Lock() 

147 self._streams_readers_changed_event = Event() 

148 self._max_stream_count = max_streams_count 

149 self._cleanup_max_batch_size = max_streams_count // 2 

150 

151 def _lru_cleanup(self): 

152 if len(self._streams_readers) < self._max_stream_count: 

153 return 

154 

155 self._logger.info(f"Running LRU cleanup of unused LogStreams; " 

156 f"max_stream_count=[{self._max_stream_count}]," 

157 f"cleanup_max_batch_size=[{self._cleanup_max_batch_size}]") 

158 

159 streams_to_remove = [] 

160 changes = False 

161 with self._streams_readers_lock: 

162 # Figure out which streams to clean up by iterating through 

163 # Do not mutate dictionary during iteration (python doesn't like that) 

164 stream_iterator = islice(self._streams_readers.items(), 0, None) 

165 while len(streams_to_remove) < self._cleanup_max_batch_size and stream_iterator: 

166 try: 

167 stream_name, reader_count = next(stream_iterator) 

168 except StopIteration: 

169 break 

170 

171 if reader_count == 0: 

172 streams_to_remove.append(stream_name) 

173 

174 # Now loop through the list of all streams names 

175 # that were marked for removal and actually clean up 

176 for stream_name in streams_to_remove: 

177 self._delete_stream(stream_name, delete_readers=True) 

178 changes = True 

179 

180 if len(streams_to_remove) == 0: 

181 self._logger.info("Cleaned up n=[0] LogStreams, " 

182 "all LogStreams in use.") 

183 else: 

184 self._logger.info(f"Cleaned up n=[{len(streams_to_remove)}] LogStreams.") 

185 

186 if changes: 

187 self._streams_readers_changed_event.set() 

188 self._streams_readers_changed_event.clear() 

189 

190 def _create_stream(self, stream_handle: StreamHandle): 

191 if stream_handle.name in self._streams: 

192 raise StreamAlreadyExistsError( 

193 f'A LogStream with the parent [{stream_handle.name}] already exists, ' 

194 'and should be used instead of creating a new stream.') 

195 self._lru_cleanup() 

196 self._streams[stream_handle.name] = _MemoryStream(stream_handle) 

197 self._streams_readers[stream_handle.name] = 0 

198 self._logger.debug(f'Stream created: {stream_handle}') 

199 

200 def stream_exists(self, stream_name: str) -> bool: 

201 return stream_name in self._streams 

202 

203 def new_client_streaming(self, stream_name: str): 

204 if stream_name in self._streams: 

205 with self._streams_readers_lock: 

206 self._streams_readers[stream_name] += 1 

207 self._streams_readers_changed_event.set() 

208 self._streams_readers_changed_event.clear() 

209 else: 

210 raise NotFoundError( 

211 f'A LogStream with the parent [{stream_name}] doesn\'t exist.') 

212 

213 def streaming_client_left(self, stream_name: str): 

214 if stream_name in self._streams: 

215 with self._streams_readers_lock: 

216 self._streams_readers[stream_name] -= 1 

217 self._streams_readers_changed_event.set() 

218 self._streams_readers_changed_event.clear() 

219 else: 

220 raise NotFoundError( 

221 f'A LogStream with the parent [{stream_name}] doesn\'t exist.') 

222 

223 def stream_finished(self, stream_name: str) -> bool: 

224 if stream_name in self._streams: 

225 return self._streams[stream_name].finished 

226 raise NotFoundError( 

227 f'A LogStream with the parent [{stream_name}] doesn\'t exist') 

228 

229 def stream_length(self, stream_name: str) -> StreamLength: 

230 if stream_name in self._streams: 

231 return self._streams[stream_name].streamlength 

232 raise NotFoundError( 

233 f'A LogStream with the parent [{stream_name}] doesn\'t exist') 

234 

235 def _append_to_stream(self, stream_name, write_resource_name: str, message: Optional[bytes] = None, 

236 *, mark_finished: Optional[bool] = False) -> None: 

237 if stream_name not in self._streams or self._streams[ 

238 stream_name].write_name != write_resource_name: 

239 raise NotFoundError( 

240 f'A LogStream with the parent [{write_resource_name}] doesn\'t exist' 

241 ) 

242 self._streams[stream_name].append(message, mark_finished) 

243 

244 def read_stream_chunk_blocking( 

245 self, stream_name: str, read_offset_bytes: int, 

246 read_limit_bytes: Optional[int] = None) -> StreamChunk: 

247 if stream_name not in self._streams: 

248 raise NotFoundError( 

249 f'A LogStream with the parent [{stream_name}] doesn\'t exist') 

250 return self._streams[stream_name].read_chunk_blocking( 

251 read_offset_bytes, read_limit_bytes) 

252 

253 def read_stream_chunk(self, stream_name: str, read_offset_bytes: int, 

254 read_limit_bytes: Optional[int] = None) -> StreamChunk: 

255 if stream_name not in self._streams: 

256 raise NotFoundError( 

257 f'A LogStream with the parent [{stream_name}] doesn\'t exist') 

258 return self._streams[stream_name].read_chunk(read_offset_bytes, read_limit_bytes) 

259 

260 def _delete_stream(self, stream_name: str, delete_readers: bool = True): 

261 # Evict actual stream from memory 

262 self._logger.debug("Deleting stream: {stream_name}") 

263 try: 

264 del self._streams[stream_name] 

265 except KeyError: 

266 # already deleted 

267 pass 

268 # Evict counter 

269 if delete_readers: 

270 try: 

271 del self._streams_readers[stream_name] 

272 except KeyError: 

273 pass 

274 

275 def _wait_for_streaming_clients(self, stream_name: str, timeout: int) -> bool: 

276 if stream_name in self._streams_readers: 

277 n_streaming = self._streams_readers.get(stream_name, 0) 

278 if n_streaming < 1: 

279 self._streams_readers_changed_event.wait(timeout=timeout) 

280 try: 

281 n_streaming = self._streams_readers[stream_name] 

282 except KeyError: 

283 raise NotFoundError( 

284 f'The Logstream with parent [{stream_name}] was deleted') 

285 return bool(n_streaming) 

286 raise NotFoundError( 

287 f'A LogStream with the parent [{stream_name}] doesn\'t exist') 

288 

289 def delete_stream(self, write_resource_name: str): 

290 raise NotImplementedError()