Coverage for waveqc/tasks.py: 0%

170 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-05-15 08:47 +0000

1import math 

2from pathlib import Path 

3from typing import Any 

4 

5import pendulum 

6from celery import Celery, Task, chain, group 

7from celery.schedules import crontab 

8from celery.utils.log import get_task_logger 

9from obspy import UTCDateTime, read 

10from obspy.clients.fdsn.client import ( 

11 FDSNBadRequestException, 

12 FDSNException, 

13 FDSNNoDataException, 

14) 

15from obspy.core.inventory import read_inventory 

16from obspy.core.stream import Stream 

17from obspy.io.mseed import InternalMSEEDError 

18from sqlalchemy import create_engine, or_, select 

19from sqlalchemy.orm import ( 

20 joinedload, 

21 scoped_session, 

22 selectinload, 

23 sessionmaker, 

24) 

25 

26from .config import settings 

27from .models import ( 

28 RESULT_DECONVOLUTION_FAILS, 

29 RESULT_DECONVOLUTION_PASS, 

30 RESULT_NO_DATA, 

31 RESULT_NOT_READABLE, 

32 Channel, 

33 Check, 

34 Network, 

35 Station, 

36) 

37from .utils import MONITORED_NETWORKS, get_obspy_client 

38 

39logger = get_task_logger(__name__) 

40 

41 

42# Maximum retries for each check 

43CHECK_MAX_RETRIES = 3 

44 

45# We request waveforms for 1 day (86400 seconds) 

46CHECK_TIME_REACH = 86400 

47 

48# Used when decimating waveforms 

49TARGET_SAMPLING_RATE = 5 

50 

51# Channel naming 

52# See : https://www.fdsn.org/pdf/SEEDManual_V2.4.pdf p. 133 

53BAND_CODE = 0 

54INSTRUMENT_CODE = 1 

55 

56# High frequency channels according to channel naming's band code 

57HIGH_FREQUENCY_CHANNELS = "FGDCESHB" 

58 

59# Seismometer channels according to channel naming's instrument code 

60SEISMOMETER_CHANNELS = "HLGMN" 

61 

62 

63engine = create_engine(str(settings.PG_DSN)) 

64dbsession = scoped_session(sessionmaker(bind=engine)) 

65 

66 

67class DatabaseTask(Task): 

68 def after_return(self, *_: list[Any]) -> None: 

69 dbsession.commit() 

70 dbsession.remove() 

71 

72 def on_failure(self, *_: list[Any]) -> None: 

73 dbsession.rollback() 

74 dbsession.remove() 

75 

76 

77app = Celery( 

78 "tasks", 

79 broker=str(settings.AMQP_DSN), 

80 result_backend=str(settings.REDIS_DSN), 

81 task_cls="waveqc.tasks.DatabaseTask", 

82) 

83 

84app.conf.task_acks_late = True 

85app.conf.task_reject_on_worker_lost = True 

86app.conf.worker_prefetch_multiplier = settings.CELERY_PREFETCH 

87app.conf.worker_max_tasks_per_child = settings.CELERY_MAX_TASKS_PER_CHILD 

88app.conf.beat_schedule_filename = str(settings.CELERY_BEAT_FILENAME) 

89app.conf.beat_schedule = { 

90 # Executes daily at 1:15 a.m. 

91 "launch-checks-daily": { 

92 "task": "waveqc.tasks.launch_checks", 

93 "schedule": crontab(hour="1", minute="15"), 

94 "args": (), 

95 }, 

96 # Executes daily at 3:15 a.m. 

97 # 5:15 a.m. 

98 # 7:15 a.m. 

99 "retry-failing-checks-daily": { 

100 "task": "waveqc.tasks.retry_failing_checks", 

101 "schedule": crontab(hour="3,5,7", minute="15"), 

102 "args": (), 

103 }, 

104} 

105 

106 

107@app.task( 

108 ignore_result=True, autoretry_for=(Exception,), retry_backoff=10, retry_jitter=True 

109) 

110def update_inventory(date: str) -> None: 

111 client = get_obspy_client() 

112 inventory = client.get_stations( 

113 network=",".join(MONITORED_NETWORKS), 

114 level="station", 

115 ) 

116 # We need to update all stations first to get an end_date if available 

117 for inventory_network in inventory.networks: 

118 network = Network().populate(dbsession, inventory_network) 

119 network.populate_operators(dbsession, inventory_network.stations) 

120 network.populate_stations(dbsession, inventory_network.stations) 

121 network.link_operators_to_stations(dbsession, inventory_network.stations) 

122 network.purge_orphaned_operators(dbsession) 

123 target = UTCDateTime(date) 

124 inventory = client.get_stations( 

125 network=",".join(MONITORED_NETWORKS), 

126 level="channel", 

127 startbefore=target, 

128 ) 

129 for inventory_network in inventory.networks: 

130 network = dbsession.scalars( 

131 select(Network) 

132 .where(Network.code == inventory_network.code) 

133 .options(selectinload(Network.stations)) 

134 ).one() 

135 network.populate_channels(dbsession, inventory_network, network.stations) 

136 

137 

138def check_completeness(stream: Stream) -> int: 

139 datapoints = sum(trace.count() for trace in stream.traces) - 1 

140 return int((datapoints - 1) * 1000 / stream.traces[0].stats.sampling_rate) 

141 

142 

143def check_shortest_trace(stream: Stream) -> int: 

144 sampling_rate = stream.traces[0].stats.sampling_rate 

145 shortest_trace = min(len(trace) for trace in stream.traces) - 1 

146 return int(math.ceil(shortest_trace / sampling_rate)) 

147 

148 

149def check_removable_response( 

150 stream: Stream, 

151 channel: str, 

152 network: str, 

153 station: str, 

154 count: int, 

155) -> int: 

156 if count > 1: 

157 try: 

158 stream.merge(fill_value=0) 

159 except Exception as message: # noqa: BLE001 

160 logger.warning( 

161 "Channel %s.%s.%s have traces with different sampling rates : %s", 

162 network, 

163 station, 

164 channel, 

165 message, 

166 ) 

167 return RESULT_DECONVOLUTION_FAILS 

168 trace = stream.traces[0] 

169 

170 # If instrument is environmental (RWD, etc...), do not try to remove response 

171 if channel[INSTRUMENT_CODE] in SEISMOMETER_CHANNELS: 

172 # decimate the trace only for hight frequency channels 

173 if channel[BAND_CODE] in HIGH_FREQUENCY_CHANNELS: 

174 decimation_factor = int(trace.stats.sampling_rate / TARGET_SAMPLING_RATE) 

175 trace.decimate(decimation_factor, no_filter=True) 

176 

177 try: 

178 inventory = read_inventory( 

179 f"{settings.STATIONXML_STORAGE_PATH}/{network}_{station}.xml" 

180 ) 

181 trace.remove_response(inventory=inventory) 

182 except (ValueError, FileNotFoundError): 

183 result = RESULT_DECONVOLUTION_FAILS 

184 else: 

185 result = RESULT_DECONVOLUTION_PASS 

186 else: 

187 result = RESULT_DECONVOLUTION_PASS 

188 return result 

189 

190 

191@app.task( 

192 ignore_result=True, autoretry_for=(Exception,), retry_backoff=10, retry_jitter=True 

193) 

194def check_item(mseed: str) -> None: 

195 completeness = count = shortest_trace = 0 

196 file = Path(mseed) 

197 nslc, start, _ = file.name.split("__") 

198 network, station, _, channel = nslc.split(".") 

199 date = pendulum.parse(start[:8], exact=True) 

200 try: 

201 stream = read(file) 

202 except (TypeError, InternalMSEEDError): 

203 result = RESULT_NOT_READABLE 

204 except FileNotFoundError: 

205 result = RESULT_NO_DATA 

206 else: 

207 stream._cleanup() # noqa: SLF001 

208 completeness = check_completeness(stream) 

209 count = len(stream.traces) 

210 shortest_trace = check_shortest_trace(stream) 

211 result = check_removable_response(stream, channel, network, station, count) 

212 Channel().store_check_result( 

213 dbsession, 

214 nslc, 

215 date, # type: ignore[arg-type] 

216 result, 

217 completeness, 

218 count, 

219 shortest_trace, 

220 ) 

221 file.unlink(missing_ok=True) 

222 

223 

224@app.task( 

225 ignore_result=True, autoretry_for=(Exception,), retry_backoff=10, retry_jitter=True 

226) 

227def remove_stationxmls() -> None: 

228 basepath = Path(settings.STATIONXML_STORAGE_PATH) 

229 for entry in basepath.iterdir(): 

230 entry.unlink() 

231 

232 

233@app.task( 

234 ignore_result=True, autoretry_for=(Exception,), retry_backoff=10, retry_jitter=True 

235) 

236def fix_closed_channels_checks() -> None: 

237 _ = Channel().fix_closed_channels_checks(dbsession) 

238 

239 

240@app.task( 

241 ignore_result=True, autoretry_for=(Exception,), retry_backoff=10, retry_jitter=True 

242) 

243def fix_missing_checks() -> None: 

244 _ = Channel().fix_missing_checks(dbsession) 

245 

246 

247@app.task(autoretry_for=(Exception,), retry_backoff=10, retry_jitter=True) 

248def build_check_list(date: str) -> list[dict[str, str]]: 

249 day = pendulum.parse(date) 

250 checks = dbsession.scalars( 

251 select(Channel) 

252 .join(Channel.station) 

253 .where( 

254 Station.start_date <= day, 

255 or_( 

256 Channel.end_date == None, # noqa: E711 

257 Channel.end_date > day, 

258 ), 

259 ) 

260 .options(joinedload(Channel.station).joinedload(Station.network)) 

261 ).all() 

262 return [ 

263 { 

264 "channel": channel.code, 

265 "location": channel.location, 

266 "station": channel.station.code, 

267 "network": channel.station.network.code, 

268 "date": date, 

269 } 

270 for channel in checks 

271 ] 

272 

273 

274@app.task(autoretry_for=(Exception,), retry_backoff=10, retry_jitter=True) 

275def get_failed_checks() -> list[dict[str, Any]]: 

276 failed_checks = dbsession.scalars( 

277 select(Check) 

278 .where( 

279 Check.retries < CHECK_MAX_RETRIES, 

280 Check.result <= RESULT_DECONVOLUTION_FAILS, 

281 ) 

282 .options( 

283 joinedload(Check.channel) 

284 .joinedload(Channel.station) 

285 .joinedload(Station.network) 

286 ) 

287 ).all() 

288 return [ 

289 { 

290 "channel": check.channel.code, 

291 "location": check.channel.location, 

292 "station": check.channel.station.code, 

293 "network": check.channel.station.network.code, 

294 "date": check.date, 

295 } 

296 for check in failed_checks 

297 ] 

298 

299 

300@app.task 

301def download_mseed(check: dict[str, str]) -> str: 

302 client = get_obspy_client() 

303 start = UTCDateTime(check["date"]) 

304 end = start + CHECK_TIME_REACH 

305 filename = ( 

306 f"{settings.MSEED_STORAGE_PATH}/" 

307 f"{check['network']}." 

308 f"{check['station']}." 

309 f"{check['location']}." 

310 f"{check['channel']}__" 

311 f"{start.strftime('%Y%m%dT%H%M%SZ')}__" 

312 f"{end.strftime('%Y%m%dT%H%M%SZ')}" 

313 ".mseed" 

314 ) 

315 

316 try: 

317 client.get_waveforms( 

318 check["network"], 

319 check["station"], 

320 check["location"], 

321 check["channel"], 

322 start, 

323 end, 

324 filename=filename, 

325 ) 

326 except FDSNNoDataException as _: 

327 pass 

328 except FDSNBadRequestException as _: 

329 logger.warning( 

330 "Bad request : %s", ".".join([str(value) for value in check.values()]) 

331 ) 

332 except FDSNException as _: 

333 logger.warning( 

334 "Web Service is not responding : %s", 

335 ".".join([str(value) for value in check.values()]), 

336 ) 

337 return filename 

338 

339 

340@app.task(bind=True) 

341def run_checks(self: DatabaseTask, checks: list[dict[str, Any]]) -> Any: # noqa: ANN401 

342 return self.replace( 

343 group([chain(download_mseed.si(check), check_item.s()) for check in checks]), 

344 ) 

345 

346 

347@app.task(autoretry_for=(Exception,), retry_backoff=10, retry_jitter=True) 

348def build_station_list(date: str) -> list[str]: 

349 day = pendulum.parse(date) 

350 stations = dbsession.scalars( 

351 select(Station) 

352 .where( 

353 Station.start_date <= day, 

354 or_( 

355 Station.end_date == None, # noqa: E711 

356 Station.end_date > day, 

357 ), 

358 ) 

359 .options(joinedload(Station.network)) 

360 ).all() 

361 return [f"{station.network.code}_{station.code}" for station in stations] 

362 

363 

364@app.task(bind=True) 

365def download_stationxmls(self: DatabaseTask, stations: list[str], date: str) -> Any: # noqa: ANN401 

366 return self.replace( 

367 group([download_stationxml.si(station, date) for station in stations]) 

368 ) 

369 

370 

371@app.task(ignore_result=True) 

372def download_stationxml(station: str, date: str) -> None: 

373 client = get_obspy_client() 

374 target = UTCDateTime(date) 

375 network, station = station.split("_") 

376 try: 

377 client.get_stations( 

378 network=network, 

379 station=station, 

380 level="response", 

381 startbefore=target, 

382 endafter=target, 

383 filename=(f"{settings.STATIONXML_STORAGE_PATH}/{network}_{station}.xml"), 

384 ) 

385 except FDSNNoDataException as _: 

386 pass 

387 except FDSNBadRequestException as _: 

388 logger.warning("Bad request : %s", station) 

389 except FDSNException as _: 

390 logger.warning("Web Service is not responding : %s", station) 

391 

392 

393@app.task(ignore_result=True) 

394def retry_failing_checks() -> None: 

395 date = pendulum.yesterday("utc").to_date_string() 

396 chain( 

397 update_inventory.si(date), 

398 build_station_list.si(date), 

399 download_stationxmls.s(date), 

400 get_failed_checks.si(), 

401 run_checks.s(), 

402 remove_stationxmls.si(), 

403 fix_closed_channels_checks.si(), 

404 fix_missing_checks.si(), 

405 )() 

406 

407 

408@app.task(ignore_result=True) 

409def launch_checks() -> None: 

410 date = pendulum.yesterday("utc").to_date_string() 

411 chain( 

412 update_inventory.si(date), 

413 build_station_list.si(date), 

414 download_stationxmls.s(date), 

415 build_check_list.si(date), 

416 run_checks.s(), 

417 remove_stationxmls.si(), 

418 fix_closed_channels_checks.si(), 

419 fix_missing_checks.si(), 

420 )()