Coverage for quality/tasks.py: 56%

157 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-03-26 15:42 +0000

1import math 

2import platform 

3from http.client import IncompleteRead 

4from pathlib import Path 

5 

6import pendulum 

7from celery import chain, group, shared_task 

8from celery.app.task import Task 

9from celery.utils.log import get_task_logger 

10from django.conf import settings 

11from obspy import UTCDateTime, read 

12from obspy import __version__ as obspy_version 

13from obspy.clients.fdsn import Client 

14from obspy.clients.fdsn.client import ( 

15 FDSNBadRequestException, 

16 FDSNException, 

17 FDSNNoDataException, 

18) 

19from obspy.clients.fdsn.header import platform_ 

20from obspy.core.inventory import read_inventory 

21from obspy.core.stream import Stream 

22from obspy.io.mseed import InternalMSEEDError 

23 

24from .models import ( 

25 Channel, 

26 Check, 

27 Network, 

28 Operator, 

29 Station, 

30) 

31 

32logger = get_task_logger(__name__) 

33 

34 

35# We request waveforms for 1 day (86400 seconds) 

36CHECK_TIME_REACH = 86400 

37 

38# Used when decimating waveforms 

39TARGET_SAMPLING_RATE = 5 

40 

41# Channel naming 

42# See : https://www.fdsn.org/pdf/SEEDManual_V2.4.pdf p. 133 

43BAND_CODE = 0 

44INSTRUMENT_CODE = 1 

45 

46# High frequency channels according to channel naming's band code 

47HIGH_FREQUENCY_CHANNELS = "FGDCESHB" 

48 

49# Seismometer channels according to channel naming's instrument code 

50SEISMOMETER_CHANNELS = "HLGMN" 

51 

52 

53def get_obspy_client() -> Client: 

54 user_agent = ( 

55 f"WaveQC (ObsPy/{obspy_version}, {platform_}, " 

56 f"Python {platform.python_version()})" 

57 ) 

58 return Client( 

59 settings.WAVEQC_FDSN_CLIENT, user_agent=user_agent, _discover_services=False 

60 ) 

61 

62 

63@shared_task( 

64 ignore_results=True, autoretry_for=(Exception,), retry_backoff=10, retry_jitter=True 

65) 

66def update_inventory(date: str) -> None: 

67 client = get_obspy_client() 

68 networks = ",".join([network.code for network in Network.objects.only("code")]) 

69 inventory = client.get_stations(network=networks, level="station") 

70 # We need to update all stations first to get an end_date if available 

71 Network.objects.populate(inventory) 

72 Station.objects.populate(inventory) 

73 Operator.objects.populate(inventory) 

74 Operator.objects.link_to_stations(inventory) 

75 Operator.objects.purge_obsoletes() 

76 target = UTCDateTime(date) 

77 inventory = client.get_stations( 

78 network=networks, level="channel", startbefore=target 

79 ) 

80 Channel.objects.populate(inventory) 

81 

82 

83@shared_task(autoretry_for=(Exception,), retry_backoff=10, retry_jitter=True) 

84def build_station_list(date: str) -> list[str]: 

85 day = pendulum.parse(date) 

86 stations = Station.objects.opened(day=day).select_related("network") # type: ignore[arg-type] 

87 return [f"{station.network.code}_{station.code}" for station in stations] 

88 

89 

90@shared_task(ignore_results=True) 

91def download_stationxml(station: str, date: str) -> None: 

92 client = get_obspy_client() 

93 target = UTCDateTime(date) 

94 network, station = station.split("_") 

95 try: 

96 client.get_stations( 

97 network=network, 

98 station=station, 

99 level="response", 

100 startbefore=target, 

101 endafter=target, 

102 filename=( 

103 f"{settings.WAVEQC_STATIONXML_STORAGE_PATH}/{network}_{station}.xml" 

104 ), 

105 ) 

106 except FDSNNoDataException as _: 

107 pass 

108 except FDSNBadRequestException as _: 

109 logger.warning("Bad request : %s", station) 

110 except (FDSNException, IncompleteRead, ValueError) as _: 

111 logger.warning("Web Service is not responding : %s", station) 

112 # This last exception shouldn't be here. Obspy should return one of its exceptions 

113 # Once obspy 1.5.0 is release, remove this 

114 except AttributeError as _: 

115 logger.warning("Web Service is not responding : %s", station) 

116 

117 

118@shared_task(bind=True) 

119def download_stationxmls(self: Task, stations: list[str], date: str) -> object: 

120 return self.replace( 

121 group([download_stationxml.si(station, date) for station in stations]), 

122 ) 

123 

124 

125@shared_task(autoretry_for=(Exception,), retry_backoff=10, retry_jitter=True) 

126def build_check_list(date: str) -> list[dict[str, str]]: 

127 day = pendulum.parse(date) 

128 channels = Channel.objects.opened(day=day).select_related("station__network") # type: ignore[arg-type] 

129 return [ 

130 { 

131 "channel": channel.code, 

132 "location": channel.location, 

133 "station": channel.station.code, 

134 "network": channel.station.network.code, 

135 "date": date, 

136 } 

137 for channel in channels 

138 ] 

139 

140 

141@shared_task(autoretry_for=(Exception,), retry_backoff=10, retry_jitter=True) 

142def get_failed_checks() -> list[dict[str, object]]: 

143 failed_checks = Check.objects.failed().select_related("channel__station__network") 

144 return [ 

145 { 

146 "channel": check.channel.code, 

147 "location": check.channel.location, 

148 "station": check.channel.station.code, 

149 "network": check.channel.station.network.code, 

150 "date": check.date, 

151 } 

152 for check in failed_checks 

153 ] 

154 

155 

156@shared_task 

157def download_mseed(check: dict[str, str]) -> str: 

158 client = get_obspy_client() 

159 start = UTCDateTime(check["date"]) 

160 end = start + CHECK_TIME_REACH 

161 filename = ( 

162 f"{settings.WAVEQC_MSEED_STORAGE_PATH}/" 

163 f"{check['network']}." 

164 f"{check['station']}." 

165 f"{check['location']}." 

166 f"{check['channel']}__" 

167 f"{start.strftime('%Y%m%dT%H%M%SZ')}__" 

168 f"{end.strftime('%Y%m%dT%H%M%SZ')}" 

169 ".mseed" 

170 ) 

171 

172 try: 

173 client.get_waveforms( 

174 check["network"], 

175 check["station"], 

176 check["location"], 

177 check["channel"], 

178 start, 

179 end, 

180 filename=filename, 

181 ) 

182 except FDSNNoDataException as _: 

183 pass 

184 except FDSNBadRequestException as _: 

185 logger.warning( 

186 "Bad request : %s", 

187 ".".join([str(value) for value in check.values()]), 

188 ) 

189 except (FDSNException, IncompleteRead, ValueError) as _: 

190 logger.warning( 

191 "Web Service is not responding : %s", 

192 ".".join([str(value) for value in check.values()]), 

193 ) 

194 return filename 

195 

196 

197def check_completeness(stream: Stream) -> int: 

198 datapoints = sum(trace.count() for trace in stream.traces) - 1 

199 return int((datapoints - 1) * 1000 / stream.traces[0].stats.sampling_rate) 

200 

201 

202def check_shortest_trace(stream: Stream) -> int: 

203 sampling_rate = stream.traces[0].stats.sampling_rate 

204 shortest_trace = min(len(trace) for trace in stream.traces) - 1 

205 return math.ceil(shortest_trace / sampling_rate) # type: ignore [no-any-return] 

206 

207 

208def check_removable_response( 

209 stream: Stream, 

210 channel: str, 

211 network: str, 

212 station: str, 

213 count: int, 

214) -> Check.Result: 

215 if count > 1: 

216 try: 

217 stream.merge(fill_value=0) 

218 except Exception as message: # noqa: BLE001 

219 logger.warning( 

220 "Channel %s.%s.%s have traces with different sampling rates : %s", 

221 network, 

222 station, 

223 channel, 

224 message, 

225 ) 

226 return Check.Result.DECONVOLUTION_FAILS 

227 trace = stream.traces[0] 

228 

229 # If instrument is environmental (RWD, etc...), do not try to remove response 

230 if channel[INSTRUMENT_CODE] in SEISMOMETER_CHANNELS: 

231 # decimate the trace only for hight frequency channels 

232 if channel[BAND_CODE] in HIGH_FREQUENCY_CHANNELS: 

233 decimation_factor = int(trace.stats.sampling_rate / TARGET_SAMPLING_RATE) 

234 trace.decimate(decimation_factor, no_filter=True) 

235 

236 try: 

237 inventory = read_inventory( 

238 f"{settings.WAVEQC_STATIONXML_STORAGE_PATH}/{network}_{station}.xml", 

239 ) 

240 trace.remove_response(inventory=inventory) 

241 except (ValueError, FileNotFoundError, TypeError): 

242 result = Check.Result.DECONVOLUTION_FAILS 

243 else: 

244 result = Check.Result.DECONVOLUTION_PASS 

245 else: 

246 result = Check.Result.DECONVOLUTION_PASS 

247 return result 

248 

249 

250@shared_task( 

251 ignore_result=True, autoretry_for=(Exception,), retry_backoff=10, retry_jitter=True 

252) 

253def check_item(mseed: str) -> None: 

254 completeness = count = shortest_trace = 0 

255 file = Path(mseed) 

256 nslc, start, _ = file.name.split("__") 

257 network, station, _, channel = nslc.split(".") 

258 date = pendulum.parse(start[:8], exact=True) 

259 try: 

260 stream = read(file) 

261 except (TypeError, InternalMSEEDError): 

262 result = Check.Result.NOT_READABLE 

263 except FileNotFoundError: 

264 result = Check.Result.NO_DATA 

265 else: 

266 stream._cleanup() # noqa: SLF001 

267 completeness = check_completeness(stream) 

268 count = len(stream.traces) 

269 shortest_trace = check_shortest_trace(stream) 

270 result = check_removable_response(stream, channel, network, station, count) 

271 file.unlink(missing_ok=True) 

272 Check.objects.store( 

273 nslc, 

274 date, # type: ignore[arg-type] 

275 result, 

276 completeness, 

277 count, 

278 shortest_trace, 

279 ) 

280 

281 

282@shared_task(bind=True) 

283def run_checks(self: Task, checks: list[dict[str, object]]) -> object: 

284 return self.replace( 

285 group([chain(download_mseed.si(check), check_item.s()) for check in checks]), 

286 ) 

287 

288 

289@shared_task( 

290 ignore_result=True, autoretry_for=(Exception,), retry_backoff=10, retry_jitter=True 

291) 

292def remove_stationxmls() -> None: 

293 basepath = Path(settings.WAVEQC_STATIONXML_STORAGE_PATH) 

294 for entry in basepath.iterdir(): 

295 entry.unlink() 

296 

297 

298@shared_task( 

299 ignore_result=True, autoretry_for=(Exception,), retry_backoff=10, retry_jitter=True 

300) 

301def fix_closed_channels_checks() -> None: 

302 Check.objects.fix_closed_channels() 

303 

304 

305@shared_task( 

306 ignore_result=True, autoretry_for=(Exception,), retry_backoff=10, retry_jitter=True 

307) 

308def fix_missing_checks() -> None: 

309 Check.objects.fix_missing() 

310 

311 

312@shared_task(ignore_result=True) 

313def retry_failing_checks() -> None: 

314 date = pendulum.yesterday("utc").to_date_string() 

315 chain( 

316 update_inventory.si(date), 

317 build_station_list.si(date), 

318 download_stationxmls.s(date), 

319 get_failed_checks.si(), 

320 run_checks.s(), 

321 remove_stationxmls.si(), 

322 fix_closed_channels_checks.si(), 

323 fix_missing_checks.si(), 

324 )() 

325 

326 

327@shared_task(ignore_results=True) 

328def launch_checks() -> None: 

329 date = pendulum.yesterday("utc").to_date_string() 

330 chain( 

331 update_inventory.si(date), 

332 build_station_list.si(date), 

333 download_stationxmls.s(date), 

334 build_check_list.si(date), 

335 run_checks.s(), 

336 remove_stationxmls.si(), 

337 fix_closed_channels_checks.si(), 

338 fix_missing_checks.si(), 

339 )()