# Copyright 2017-2023 Tom Eulenfeld, MIT license
"""Commands used by the CLI interface"""
import functools
import glob
import logging
import multiprocessing
import os
import re
import shutil
import textwrap

import h5py
import numpy as np
import obspy
from obspy.core import UTCDateTime as UTC
import obspyh5
import tqdm

import yam
from yam.correlate import correlate
from import _get_existent, _iter_h5, _write_corr, read_dicts, write_dict
import yam.stack
import yam.stretch
from yam.util import _analyze_key, _filter, _get_fname, IterTime, ParseError

log = logging.getLogger('yam.commands')

def _todo_tasks(tasks, done_tasks):
    if len(tasks) == 0:
        log.warning('no tasks found -> nothing to do')
    new_tasks = sorted(set(tasks) - set(done_tasks))
    if len(new_tasks) < len(tasks):
        msg = '%d of %d tasks already processed -> skip these tasks', len(tasks) - len(new_tasks), len(tasks))
    return sorted(new_tasks)

[docs]def start_correlate(io, filter_inventory=None, startdate='1990-01-01', enddate='2020-01-01', njobs=None, parallel_inner_loop=False, keep_correlations=False, stack='1d', dataset_kwargs=None, **kwargs): """ Start correlation :param io: |io| :param filter_inventory: filter inventory with its select method, specified dict is passed to || :param str startdate,enddate: start and end date as strings : param njobs: number of cores to use for computation, days are computed parallel, this might consume much memory, default: None -- use all available cores, set njobs to 0 for sequential processing :param parallel_inner_loop: Run inner loops parallel instead of outer loop (preproccessing of different stations and correlation of different pairs versus processing of different days). Useful for a datset with many stations. :param dtype: data type for storing correlations (default: float16 - half precision) :param dataset_kwargs: options passed to obspyh5 resp. h5py when creating a new dataset, e.g. `dataset_kwargs={'compression':'gzip'}`. See create_dataset in h5py for more options. By default the dtype is set to `'float16'`. :param keep_correlations,stack,\*\*kwargs: all other kwargs are passed to `~yam.correlate.correlate()` function """ if dataset_kwargs is None: dataset_kwargs = {} if filter_inventory: log.debug('filter inventory') io['inventory'] = io['inventory'].select(**filter_inventory)'start preprocessing and correlation') tasks = [str(t)[:10] for t in IterTime(UTC(startdate), UTC(enddate))] done_tasks = None if stack is not None: key2 = kwargs['outkey'] + '_s' + stack done_tasks = [t[-16:-6] for t in _get_existent(io['stack'], key2)] if keep_correlations: key2 = kwargs['outkey'] done_tasks2 = [t[-16:-6] for t in _get_existent(io['corr'], key2)] if done_tasks is None: done_tasks = done_tasks2 else: done_tasks = [t for t in done_tasks if t in done_tasks2] tasks = _todo_tasks(tasks, done_tasks) tasks = [UTC(t) for t in tasks] kwargs.update({'keep_correlations': keep_correlations, 'stack': stack}) if parallel_inner_loop: kwargs['njobs'] = njobs njobs = 0 do_work = functools.partial(correlate, io, **kwargs) if njobs == 0:'do work sequentially') for task in tqdm.tqdm(tasks, total=len(tasks)): result = do_work(task) _write_corr(result, io, **dataset_kwargs) else: pool = multiprocessing.Pool(njobs)'do work parallel (%d cores)', pool._processes) for result in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)): _write_corr(result, io, **dataset_kwargs) pool.close() pool.join()'finished preprocessing and correlation')
def _stack_wrapper(groupnames, fname, outkey, **kwargs): """ Wrapper around `~yam.stack.stack()` :param groupnames: groups to load the correlations from :param fname: file to load correlations from :param outkey: key to write stacked correlations to :param \*\*kwargs: all other kwargs are passed to `~yam.stack.stack()` function """ with h5py.File(fname, 'r') as f: traces = [obspyh5.dataset2trace(f[g]) for g in groupnames] stream = obspy.Stream(traces) stack_stream = yam.stack.stack(stream, **kwargs) for tr in stack_stream: tr.stats.key = outkey return stack_stream
[docs]def start_stack(io, key, outkey, subkey='', njobs=None, starttime=None, endtime=None, dataset_kwargs=None, **kwargs): """ Start stacking :param io: |io| :param key: key to load correlations from :param outkey: key to write stacked correlations to :param subkey: only use a part of the correlations :param njobs: number of cores to use for computation, default: None -- use all available cores, set njobs to 0 for sequential processing :param starttime,endtime: constrain start and end dates :param dataset_kwargs: options passed to obspyh5 resp. h5py when creating a new dataset, e.g. `dataset_kwargs={'compression':'gzip'}`. See create_dataset in h5py for more options. By default the dtype is set to `'float16'`. :param \*\*kwargs: all other kwargs are passed to `yam.stack.stack()` function """ if dataset_kwargs is None: dataset_kwargs = {} dataset_kwargs.setdefault('dtype', 'float16') fname = io['stack'] if 's' in _analyze_key(key) else io['corr'] tasks = _get_existent(fname, key + subkey, 3) done_tasks = [t.replace(outkey, key) for t in _get_existent(io['stack'], outkey + subkey, 3)] tasks = _todo_tasks(tasks, done_tasks) length = kwargs.get('length') for task in tqdm.tqdm(tasks, total=len(tasks)): subtasks = [t for t in _get_existent(fname, task) if (starttime is None or t[-16:] >= starttime) and (endtime is None or t[-16:] <= endtime)] if length is None and njobs != 0: step = 1000 subtask_chunks = [tuple(subtasks[i:i + step]) for i in range(0, len(subtasks), step)] else: subtask_chunks = [subtasks] # TODO: parallel stacking for arbitrary stack id # lensec = _time2sec(length) # if lensec >= 30 * 24 * 3600: # subtask_chunks = [subtasks] # else: # subtask_chunks = [] # for i in range(0, len(subtasks), step): # chunk = subtasks[i:i + step] # t1 = UTC(subtasks[i + step - 1][-16:]) # j = 0 # while i + step + j < len(subtasks): # t2 = UTC(subtasks[i + step + j][-16:]) # # assume lensec is always larger than movesec # # not ideal, may load to much data # # eg for stack over 1 year # if t2 - t1 <= lensec: # chunk.append(subtasks[i + step + j]) # else: # break # j += 1 # subtask_chunks.append(chunk) do_work = functools.partial(_stack_wrapper, fname=fname, outkey=outkey, **kwargs) results = [] if njobs == 0 or len(subtask_chunks) == 1: log.debug('do work sequentially') for stask in tqdm.tqdm(subtask_chunks, total=len(subtask_chunks)): result = do_work(stask) results.append(result) else: pool = multiprocessing.Pool(njobs) log.debug('do work parallel (%d cores)', pool._processes) for result in tqdm.tqdm( pool.imap(do_work, subtask_chunks), total=len(subtask_chunks)): results.append(result) pool.close() pool.join() if length is None: for stream in results: assert len(stream) <= 1 traces = [tr for stream in results for tr in stream] num = sum(tr.stats.num for tr in traces) data = np.sum([ * (tr.stats.num / num) for tr in traces], axis=0) tr_stack = obspy.Trace(data, header=traces[0].stats) tr_stack.stats.num = num tr_stack.write(io['stack'], 'H5', mode='a', **dataset_kwargs) else: for stack_stream in results: stack_stream.write(io['stack'], 'H5', mode='a', **dataset_kwargs)
def _stretch_wrapper(groupnames, fname, outkey, filter=None, **kwargs): """ Wrapper around `~yam.stretch.stretch()` :param groupname: group to load the correlations from :param fname: file to load correlations from :param fname_stretch: file for writing results :param outkey: key to write stretch results to :param filter: filter correlations before stretching (bandpass, tuple with min and max frequency) :param \*\*kwargs: all other kwargs are passed to `~yam.stretch.stretch()` function """ with h5py.File(fname, 'r') as f: traces = [obspyh5.dataset2trace(f[g]) for g in groupnames] stream = obspy.Stream(traces) for tr in stream: = np.require(, 'float16') if filter: _filter(stream, filter) stretchres = yam.stretch.stretch(stream, **kwargs) if stretchres is not None: stretchres['attrs']['key'] = outkey return stretchres
[docs]def start_stretch(io, key, subkey='', njobs=None, reftrid=None, starttime=None, endtime=None, dataset_kwargs=None, **kwargs): """ Start stretching :param io: |io| :param key: key to load correlations from :param subkey: only use a part of the correlations :param njobs: number of cores to use for computation, default: None -- use all available cores, set njobs to 0 for sequential processing :param reftrid: Parallel processing is only possible when this parameter is specified. Key to load the reference trace from, e.g. `'c1_s'`, it can be created by a command similar to `yam stack c1 ''`. :param starttime,endtime: constrain start and end dates :param dataset_kwargs: options passed to obspyh5 resp. h5py when creating a new dataset, e.g. `dataset_kwargs={'compression':'gzip'}`. See create_dataset in h5py for more options. By default the dtype is set to `'float16'`. :param \*\*kwargs: all other kwargs are passed to `stretch_wrapper()` function """ if dataset_kwargs is None: dataset_kwargs = {} fname = _get_fname(io, key) outkey = kwargs['outkey'] tasks = _get_existent(fname, key + subkey, 3) done_tasks = [t.replace(outkey, key) for t in _get_existent(io['stretch'], outkey + subkey, 3)] tasks = _todo_tasks(tasks, done_tasks) for task in tqdm.tqdm(tasks, total=len(tasks)): if reftrid is None: reftr = None else: fname_reftr = _get_fname(io, reftrid) group_reftr = task.replace(key, reftrid) reftr =, 'H5', group=group_reftr, dtype='float16') if len(reftr) != 1: raise NotImplementedError('Reference must be single trace') reftr = reftr[0] subtasks = [t for t in _get_existent(fname, task) if (starttime is None or t[-16:] >= starttime) and (endtime is None or t[-16:] <= endtime)] if reftr is None: subtask_chunks = [tuple(subtasks)] else: step = 1000 subtask_chunks = [tuple(subtasks[i:i + step]) for i in range(0, len(subtasks), step)] do_work = functools.partial(_stretch_wrapper, fname=fname, reftr=reftr, **kwargs) results = [] if njobs == 0 or len(subtask_chunks) == 1: log.debug('do work sequentially') for stask in tqdm.tqdm(subtask_chunks, total=len(subtask_chunks)): result = do_work(stask) results.append(result) else: pool = multiprocessing.Pool(njobs) log.debug('do work parallel (%d cores)', pool._processes) for result in tqdm.tqdm( pool.imap(do_work, subtask_chunks), total=len(subtask_chunks)): results.append(result) pool.close() pool.join() result = yam.stretch.join_dicts(results) if result is not None: write_dict(result, io['stretch'], **dataset_kwargs)
def _start_ipy(obj): from IPython import start_ipython print('Contents loaded into obj variable.') start_ipython(argv=[], user_ns={'obj': obj}, display_banner=False) print('Good Bye') def _get_print2(): num, _ = shutil.get_terminal_size() if num == 0: num = 80 wrap = textwrap.TextWrapper(width=num - 1, initial_indent=' ', subsequent_indent=' ') def print2(text): print(wrap.fill(text)) return print2 def _get_data_glob(data): """ Construct a glob expression from the data expression """ return re.sub(r'{[^{}]*}', '*', data) def _get_data_files(data): """ Construct a glob expression from the data expression and return file names """ return glob.glob(_get_data_glob(data)) def _print_info_helper(key, io): max_count = 10000 print2 = _get_print2() is_stretch = key == 'tstretch' fname = _get_fname(io, key) keys = _get_existent(fname, '/', 1) # 1, 3, 4 if len(keys) == 0: print2('None') for key in sorted(keys): keys2 = _get_existent(fname, key, 3) subkey = key.split('/')[-1] if is_stretch: o = '%s: %d combs' % (subkey, len(keys2)) else: keys3 = _get_existent(fname, key, max_count=max_count+1) o = ('%s: %d combs, %s%s corrs' % (subkey, len(keys2), (len(keys3) > max_count) * '>', min(max_count, len(keys3)))) print2(o)
[docs]def info(io, key=None, subkey='', config=None, **unused_kwargs): """ Print information about yam project :param io: |io| :param key: key to print infos about (key inside HDF5 file, or one of data, stations, default: None -- print overview) :param subkey: only print part of the HDF5 file :param config: list of configuration dictionaries """ print2 = _get_print2() data_plugin = io.get('data_plugin') if key is None: print('Stations:') inventory = io['inventory'] if inventory is None: print2('Not found') else: stations = inventory.get_contents()['stations'] channels = inventory.get_contents()['channels'] print2(' '.join(st.strip().split()[0] for st in stations)) print2('%d stations, %d channels' % (len(stations), len(channels))) if data_plugin: print('Data plugin:') print2('%s' % data_plugin) else: print('Raw data (expression for day files):') print2(io['data']) print2('%d files found' % len(_get_data_files(io['data']))) print('Config ids:') def get_keys(d): if d is None or len(d) == 0: return 'None' else: return ', '.join(sorted(d.keys())) print2('c Corr: ' + get_keys(config[0])) print2('s Stack: ' + get_keys(config[1])) print2('t Stretch: ' + get_keys(config[2])) print('Correlations (channel combinations, correlations calculated):') _print_info_helper('corr', io) print('Stacks:') _print_info_helper('stack', io) print('Stretching matrices:') _print_info_helper('tstretch', io) elif key == 'stations': print(io['inventory']) elif key == 'data': if data_plugin: print('Data plugin:') print2('%s' % data_plugin) else: print('Raw data (expression for day files):') print2(io['data']) fnames = _get_data_files(io['data']) print2('%d files found' % len(fnames)) for fname in sorted(fnames): print2(fname) else: is_stretch = 't' in _analyze_key(key) fname = _get_fname(io, key) level = 3 if is_stretch else None for line in _get_existent(fname, key + subkey, level): print2(line)
def _load_data(seedid, day, data, data_format, key='data', **prep_kw): """Load preprocessed or raw data""" from obspy import UTCDateTime as UTC from yam.util import _seedid2meta from yam.correlate import get_data, preprocess smeta = _seedid2meta(seedid) day = UTC(day) if key == 'data': obj = get_data(smeta, data, data_format, day, overlap=0, edge=0, trim_and_merge=True) return obj stream = get_data(smeta, data, data_format, day, overlap=0, edge=60, trim_and_merge=False) preprocess(stream, day, **prep_kw) return stream
[docs]def load(io, key, seedid=None, day=None, do='return', prep_kw={}, fname=None, format=None): """ Load object and do something with it :param io: io :param key: key of object to load (key inside HDF5 file, or one of data, prepdata, stations) :param seedid: seed id of a channel (for data or prepdata) :param day: |UTC| object with day (for data or prepdata) :param do: specifies what to do with the object, default is ``'return'`` which simply returns the object, other possible values are ``'print'`` -- print object (used by print command), ``'load'`` -- load object in IPython session (used by load command), ``'export'`` -- export correlations to different file format (used by export command) :param dict prep_kw: options passed to preprocess (for prepdata only) :param fname: file name (for export command) :param format: target format (for export command) """ if key == 'stations': obj = io['inventory'] elif key in ('data', 'prepdata'): if seedid is None or day is None: msg = 'seedid and day must be given for data or prepdata' raise ParseError(msg) if key == 'prepdata': prep_keys = ('remove_response', 'remove_response_options', 'demean', 'filter', 'normalization', 'time_norm_options', 'spectral_whitening_options', 'tolerance_shift', 'downsample') prep_kw = {k: prep_kw.get(k) for k in prep_keys} obj = _load_data(seedid, day, io['data'], io.get('data_format'), key, inventory=io['inventory'], **prep_kw) else: is_stretch = 't' in _analyze_key(key) fname_in = _get_fname(io, key) if is_stretch: obj = read_dicts(fname_in, key) if do == 'print': obj = '\n\n'.join(str(o) for o in obj) else: obj =, 'H5', group=key, headonly=do == 'print') if do == 'print': obj = obj.__str__(extended=True) if do == 'print': print(obj) elif do == 'load': _start_ipy(obj) elif do == 'return': return obj elif do == 'export': print('export', obj) if format == 'H5': obspyh5.set_index() obj.write(fname, format) if format == 'H5': from import INDEX obspyh5.set_index(INDEX) else: raise
[docs]def plot(io, key, plottype=None, seedid=None, day=None, prep_kw={}, corrid=None, show=False, **kwargs): """ Plot everything :param io: |io| :param key: key of objects to plot, or one of stations, data, prepdata :param plottype: plot type to use (non default values are ``'vs_dist'`` and ``'wiggle'`` for correlation plots, ``'velocity'`` for plots of stretching results) :param seedid: seed id of a channel (for data or prepdata) :param day: |UTC| object with day (for data or prepdata) :param dict prep_kw: options passed to preprocess (for prepdata only) :param corrid: correlation configuration (for prepdata only) :param show: show interactive plot :param \*\*kwargs: all other kwargs are passed to the corresponding plot function in `~yam.imaging` module """ import yam.imaging path = io['plot'] if not os.path.exists(path): os.mkdir(path) if key in ('stations', 'data', 'prepdata'): pt = key else: is_corr = 't' not in _analyze_key(key) if is_corr and plottype == 'vs_dist': pt = 'corr_vs_dist' elif is_corr and plottype == 'wiggle': pt = 'corr_vs_time_wiggle' elif is_corr and plottype is None: pt = 'corr_vs_time' elif not is_corr and plottype is None: pt = 'sim_mat' elif not is_corr and plottype == 'velocity': pt = 'velocity_change' else: raise ParseError('Combination of key and plottype not supported') kw = kwargs.get('plot_%s_options' % pt, {}) kw.update(kwargs.get('plot_options', {})) bname = os.path.join(path, pt) if key == 'stations': yam.imaging.plot_stations(io['inventory'], bname, **kw) elif key in ('data', 'prepdata'): data = load(io, key, do='return', seedid=seedid, day=day, prep_kw=prep_kw) fname = bname + '_%s_%s' % (seedid, day) if key == 'prepdata': fname = fname + '_c' + corrid yam.imaging.plot_data(data, fname, show=show, **kw) else: plot_ = getattr(yam.imaging, 'plot_' + pt) if pt == 'corr_vs_dist': fname2 = _get_fname(io, key) stream =, 'H5', group=key) fname = bname + '_' + key.replace('/', '_') plot_(stream, fname, **kw) elif pt == 'velocity_change': results = [res for task, res in _iter_h5(io, key)] fname = bname + '_' + key.replace('/', '_') plot_(results, fname, **kw) else: for task, res in _iter_h5(io, key): fname = bname + task.replace('/', '_') plot_(res, fname, **kw) if show: from matplotlib.pyplot import show show()
[docs]def remove(io, keys): """ Remove one or several keys from HDF5 file :param io: |io| :param keys: list of keys to remove """ for key in keys: if '/' in key and key.split('/', 1) != '': from warnings import warn warn('It is highly encouraged to delete only top level keys') fname = _get_fname(io, key) with h5py.File(fname, 'a') as f: del f[key]