Coverage for waveqc/tasks.py: 0%
170 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-05-15 08:47 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-05-15 08:47 +0000
1import math
2from pathlib import Path
3from typing import Any
5import pendulum
6from celery import Celery, Task, chain, group
7from celery.schedules import crontab
8from celery.utils.log import get_task_logger
9from obspy import UTCDateTime, read
10from obspy.clients.fdsn.client import (
11 FDSNBadRequestException,
12 FDSNException,
13 FDSNNoDataException,
14)
15from obspy.core.inventory import read_inventory
16from obspy.core.stream import Stream
17from obspy.io.mseed import InternalMSEEDError
18from sqlalchemy import create_engine, or_, select
19from sqlalchemy.orm import (
20 joinedload,
21 scoped_session,
22 selectinload,
23 sessionmaker,
24)
26from .config import settings
27from .models import (
28 RESULT_DECONVOLUTION_FAILS,
29 RESULT_DECONVOLUTION_PASS,
30 RESULT_NO_DATA,
31 RESULT_NOT_READABLE,
32 Channel,
33 Check,
34 Network,
35 Station,
36)
37from .utils import MONITORED_NETWORKS, get_obspy_client
39logger = get_task_logger(__name__)
42# Maximum retries for each check
43CHECK_MAX_RETRIES = 3
45# We request waveforms for 1 day (86400 seconds)
46CHECK_TIME_REACH = 86400
48# Used when decimating waveforms
49TARGET_SAMPLING_RATE = 5
51# Channel naming
52# See : https://www.fdsn.org/pdf/SEEDManual_V2.4.pdf p. 133
53BAND_CODE = 0
54INSTRUMENT_CODE = 1
56# High frequency channels according to channel naming's band code
57HIGH_FREQUENCY_CHANNELS = "FGDCESHB"
59# Seismometer channels according to channel naming's instrument code
60SEISMOMETER_CHANNELS = "HLGMN"
63engine = create_engine(str(settings.PG_DSN))
64dbsession = scoped_session(sessionmaker(bind=engine))
67class DatabaseTask(Task):
68 def after_return(self, *_: list[Any]) -> None:
69 dbsession.commit()
70 dbsession.remove()
72 def on_failure(self, *_: list[Any]) -> None:
73 dbsession.rollback()
74 dbsession.remove()
77app = Celery(
78 "tasks",
79 broker=str(settings.AMQP_DSN),
80 result_backend=str(settings.REDIS_DSN),
81 task_cls="waveqc.tasks.DatabaseTask",
82)
84app.conf.task_acks_late = True
85app.conf.task_reject_on_worker_lost = True
86app.conf.worker_prefetch_multiplier = settings.CELERY_PREFETCH
87app.conf.worker_max_tasks_per_child = settings.CELERY_MAX_TASKS_PER_CHILD
88app.conf.beat_schedule_filename = str(settings.CELERY_BEAT_FILENAME)
89app.conf.beat_schedule = {
90 # Executes daily at 1:15 a.m.
91 "launch-checks-daily": {
92 "task": "waveqc.tasks.launch_checks",
93 "schedule": crontab(hour="1", minute="15"),
94 "args": (),
95 },
96 # Executes daily at 3:15 a.m.
97 # 5:15 a.m.
98 # 7:15 a.m.
99 "retry-failing-checks-daily": {
100 "task": "waveqc.tasks.retry_failing_checks",
101 "schedule": crontab(hour="3,5,7", minute="15"),
102 "args": (),
103 },
104}
107@app.task(
108 ignore_result=True, autoretry_for=(Exception,), retry_backoff=10, retry_jitter=True
109)
110def update_inventory(date: str) -> None:
111 client = get_obspy_client()
112 inventory = client.get_stations(
113 network=",".join(MONITORED_NETWORKS),
114 level="station",
115 )
116 # We need to update all stations first to get an end_date if available
117 for inventory_network in inventory.networks:
118 network = Network().populate(dbsession, inventory_network)
119 network.populate_operators(dbsession, inventory_network.stations)
120 network.populate_stations(dbsession, inventory_network.stations)
121 network.link_operators_to_stations(dbsession, inventory_network.stations)
122 network.purge_orphaned_operators(dbsession)
123 target = UTCDateTime(date)
124 inventory = client.get_stations(
125 network=",".join(MONITORED_NETWORKS),
126 level="channel",
127 startbefore=target,
128 )
129 for inventory_network in inventory.networks:
130 network = dbsession.scalars(
131 select(Network)
132 .where(Network.code == inventory_network.code)
133 .options(selectinload(Network.stations))
134 ).one()
135 network.populate_channels(dbsession, inventory_network, network.stations)
138def check_completeness(stream: Stream) -> int:
139 datapoints = sum(trace.count() for trace in stream.traces) - 1
140 return int((datapoints - 1) * 1000 / stream.traces[0].stats.sampling_rate)
143def check_shortest_trace(stream: Stream) -> int:
144 sampling_rate = stream.traces[0].stats.sampling_rate
145 shortest_trace = min(len(trace) for trace in stream.traces) - 1
146 return int(math.ceil(shortest_trace / sampling_rate))
149def check_removable_response(
150 stream: Stream,
151 channel: str,
152 network: str,
153 station: str,
154 count: int,
155) -> int:
156 if count > 1:
157 try:
158 stream.merge(fill_value=0)
159 except Exception as message: # noqa: BLE001
160 logger.warning(
161 "Channel %s.%s.%s have traces with different sampling rates : %s",
162 network,
163 station,
164 channel,
165 message,
166 )
167 return RESULT_DECONVOLUTION_FAILS
168 trace = stream.traces[0]
170 # If instrument is environmental (RWD, etc...), do not try to remove response
171 if channel[INSTRUMENT_CODE] in SEISMOMETER_CHANNELS:
172 # decimate the trace only for hight frequency channels
173 if channel[BAND_CODE] in HIGH_FREQUENCY_CHANNELS:
174 decimation_factor = int(trace.stats.sampling_rate / TARGET_SAMPLING_RATE)
175 trace.decimate(decimation_factor, no_filter=True)
177 try:
178 inventory = read_inventory(
179 f"{settings.STATIONXML_STORAGE_PATH}/{network}_{station}.xml"
180 )
181 trace.remove_response(inventory=inventory)
182 except (ValueError, FileNotFoundError):
183 result = RESULT_DECONVOLUTION_FAILS
184 else:
185 result = RESULT_DECONVOLUTION_PASS
186 else:
187 result = RESULT_DECONVOLUTION_PASS
188 return result
191@app.task(
192 ignore_result=True, autoretry_for=(Exception,), retry_backoff=10, retry_jitter=True
193)
194def check_item(mseed: str) -> None:
195 completeness = count = shortest_trace = 0
196 file = Path(mseed)
197 nslc, start, _ = file.name.split("__")
198 network, station, _, channel = nslc.split(".")
199 date = pendulum.parse(start[:8], exact=True)
200 try:
201 stream = read(file)
202 except (TypeError, InternalMSEEDError):
203 result = RESULT_NOT_READABLE
204 except FileNotFoundError:
205 result = RESULT_NO_DATA
206 else:
207 stream._cleanup() # noqa: SLF001
208 completeness = check_completeness(stream)
209 count = len(stream.traces)
210 shortest_trace = check_shortest_trace(stream)
211 result = check_removable_response(stream, channel, network, station, count)
212 Channel().store_check_result(
213 dbsession,
214 nslc,
215 date, # type: ignore[arg-type]
216 result,
217 completeness,
218 count,
219 shortest_trace,
220 )
221 file.unlink(missing_ok=True)
224@app.task(
225 ignore_result=True, autoretry_for=(Exception,), retry_backoff=10, retry_jitter=True
226)
227def remove_stationxmls() -> None:
228 basepath = Path(settings.STATIONXML_STORAGE_PATH)
229 for entry in basepath.iterdir():
230 entry.unlink()
233@app.task(
234 ignore_result=True, autoretry_for=(Exception,), retry_backoff=10, retry_jitter=True
235)
236def fix_closed_channels_checks() -> None:
237 _ = Channel().fix_closed_channels_checks(dbsession)
240@app.task(
241 ignore_result=True, autoretry_for=(Exception,), retry_backoff=10, retry_jitter=True
242)
243def fix_missing_checks() -> None:
244 _ = Channel().fix_missing_checks(dbsession)
247@app.task(autoretry_for=(Exception,), retry_backoff=10, retry_jitter=True)
248def build_check_list(date: str) -> list[dict[str, str]]:
249 day = pendulum.parse(date)
250 checks = dbsession.scalars(
251 select(Channel)
252 .join(Channel.station)
253 .where(
254 Station.start_date <= day,
255 or_(
256 Channel.end_date == None, # noqa: E711
257 Channel.end_date > day,
258 ),
259 )
260 .options(joinedload(Channel.station).joinedload(Station.network))
261 ).all()
262 return [
263 {
264 "channel": channel.code,
265 "location": channel.location,
266 "station": channel.station.code,
267 "network": channel.station.network.code,
268 "date": date,
269 }
270 for channel in checks
271 ]
274@app.task(autoretry_for=(Exception,), retry_backoff=10, retry_jitter=True)
275def get_failed_checks() -> list[dict[str, Any]]:
276 failed_checks = dbsession.scalars(
277 select(Check)
278 .where(
279 Check.retries < CHECK_MAX_RETRIES,
280 Check.result <= RESULT_DECONVOLUTION_FAILS,
281 )
282 .options(
283 joinedload(Check.channel)
284 .joinedload(Channel.station)
285 .joinedload(Station.network)
286 )
287 ).all()
288 return [
289 {
290 "channel": check.channel.code,
291 "location": check.channel.location,
292 "station": check.channel.station.code,
293 "network": check.channel.station.network.code,
294 "date": check.date,
295 }
296 for check in failed_checks
297 ]
300@app.task
301def download_mseed(check: dict[str, str]) -> str:
302 client = get_obspy_client()
303 start = UTCDateTime(check["date"])
304 end = start + CHECK_TIME_REACH
305 filename = (
306 f"{settings.MSEED_STORAGE_PATH}/"
307 f"{check['network']}."
308 f"{check['station']}."
309 f"{check['location']}."
310 f"{check['channel']}__"
311 f"{start.strftime('%Y%m%dT%H%M%SZ')}__"
312 f"{end.strftime('%Y%m%dT%H%M%SZ')}"
313 ".mseed"
314 )
316 try:
317 client.get_waveforms(
318 check["network"],
319 check["station"],
320 check["location"],
321 check["channel"],
322 start,
323 end,
324 filename=filename,
325 )
326 except FDSNNoDataException as _:
327 pass
328 except FDSNBadRequestException as _:
329 logger.warning(
330 "Bad request : %s", ".".join([str(value) for value in check.values()])
331 )
332 except FDSNException as _:
333 logger.warning(
334 "Web Service is not responding : %s",
335 ".".join([str(value) for value in check.values()]),
336 )
337 return filename
340@app.task(bind=True)
341def run_checks(self: DatabaseTask, checks: list[dict[str, Any]]) -> Any: # noqa: ANN401
342 return self.replace(
343 group([chain(download_mseed.si(check), check_item.s()) for check in checks]),
344 )
347@app.task(autoretry_for=(Exception,), retry_backoff=10, retry_jitter=True)
348def build_station_list(date: str) -> list[str]:
349 day = pendulum.parse(date)
350 stations = dbsession.scalars(
351 select(Station)
352 .where(
353 Station.start_date <= day,
354 or_(
355 Station.end_date == None, # noqa: E711
356 Station.end_date > day,
357 ),
358 )
359 .options(joinedload(Station.network))
360 ).all()
361 return [f"{station.network.code}_{station.code}" for station in stations]
364@app.task(bind=True)
365def download_stationxmls(self: DatabaseTask, stations: list[str], date: str) -> Any: # noqa: ANN401
366 return self.replace(
367 group([download_stationxml.si(station, date) for station in stations])
368 )
371@app.task(ignore_result=True)
372def download_stationxml(station: str, date: str) -> None:
373 client = get_obspy_client()
374 target = UTCDateTime(date)
375 network, station = station.split("_")
376 try:
377 client.get_stations(
378 network=network,
379 station=station,
380 level="response",
381 startbefore=target,
382 endafter=target,
383 filename=(f"{settings.STATIONXML_STORAGE_PATH}/{network}_{station}.xml"),
384 )
385 except FDSNNoDataException as _:
386 pass
387 except FDSNBadRequestException as _:
388 logger.warning("Bad request : %s", station)
389 except FDSNException as _:
390 logger.warning("Web Service is not responding : %s", station)
393@app.task(ignore_result=True)
394def retry_failing_checks() -> None:
395 date = pendulum.yesterday("utc").to_date_string()
396 chain(
397 update_inventory.si(date),
398 build_station_list.si(date),
399 download_stationxmls.s(date),
400 get_failed_checks.si(),
401 run_checks.s(),
402 remove_stationxmls.si(),
403 fix_closed_channels_checks.si(),
404 fix_missing_checks.si(),
405 )()
408@app.task(ignore_result=True)
409def launch_checks() -> None:
410 date = pendulum.yesterday("utc").to_date_string()
411 chain(
412 update_inventory.si(date),
413 build_station_list.si(date),
414 download_stationxmls.s(date),
415 build_check_list.si(date),
416 run_checks.s(),
417 remove_stationxmls.si(),
418 fix_closed_channels_checks.si(),
419 fix_missing_checks.si(),
420 )()