Source code for yam.stretch

# Copyright 2017-2023, Tom Eulenfeld, MIT license
"""
Stretch correlations

The results are returned in a dictionary with the following entries:

:times: strings of starttimes of the traces (1D array, length ``N1``)
:velchange_values: velocity changes (%) corresponding to the used stretching
    factors (assuming a homogeneous velocity change, 1D array, length ``N2``)
:tw: used lag time window
:sim_mat: similarity matrices (2D array, dimension ``(N1, N2)``)
:velchange_vs_time: velocity changes (%) as a function of time
    (value of highest correlation/similarity for each time, length ``N1``)
:corr_vs_time: correlation values as a function of time
    (value of highest correlation/similarity for each time, length ``N1``)
:attrs: dictionary with metadata
    (e.g. network, station, channel information of both stations,
    inter-station distance and parameters passed to the stretching function)

|
"""

import logging
import functools
import numpy as np
from scipy.interpolate import InterpolatedUnivariateSpline
from warnings import warn

from yam.util import _corr_id, _trim, _trim_time_period
import yam.stack

log = logging.getLogger('yam.stretch')


def _intersect_sorted(l1, l2):
    i = 0
    res = []
    for el in l1:
        while l2[i] < el:
            i += 1
        if l2[i] == el:
            res.append(el)
            i += 1
    return res


def _index_sorted(l1, l2):
    index = np.empty(len(l1), dtype=bool)
    i = 0
    for j, el in enumerate(l1):
        if len(l2) <= i:
            index[j:] = False
            break
        if l2[i] == el:
            index[j] = True
            i += 1
        else:
            index[j] = False
    return index


def _update_result(res):
    sim_mat = res['sim_mat']
    res['corr_vs_time'] = np.max(sim_mat, axis=1)
    argmax = np.argmax(sim_mat, axis=1)
    res['velchange_vs_time'] = res['velchange_values'][argmax]


[docs]def join_dicts(dicts): """Join list of dictionaries with stretching results""" # TODO: write test for this function if len(dicts) == 0: return elif len(dicts) == 1: return dicts[0] dicts = sorted(dicts, key=lambda d: d['times'][0]) dim1 = sum(len(d['times']) for d in dicts) d = dicts[0] dim2 = len(d['velchange_values']) dtype = d['sim_mat'].dtype res = {'sim_mat': np.empty((dim1, dim2), dtype=dtype), 'velchange_values': d['velchange_values'], 'times': np.empty(dim1, dtype=d['times'].dtype), 'velchange_vs_time': np.empty(dim1, dtype=dtype), 'corr_vs_time': np.empty(dim1, dtype=dtype), 'lag_time_windows': d['lag_time_windows'], 'attrs': d['attrs']} res['attrs']['endtime'] = dicts[-1]['attrs']['endtime'] i = 0 for d in dicts: j = i + len(d['times']) res['sim_mat'][i:j, :] = d['sim_mat'] res['times'][i:j] = d['times'] res['velchange_vs_time'][i:j] = d['velchange_vs_time'] res['corr_vs_time'][i:j] = d['corr_vs_time'] i = j return res
[docs]def average_dicts(dicts): """Average list of dictionaries with stretching results""" # TODO: write test for this function if len(dicts) == 0: return elif len(dicts) == 1: return dicts[0] times = functools.reduce(_intersect_sorted, [d['times'] for d in dicts]) d = dicts[0] sim_mat = np.mean( [d['sim_mat'][_index_sorted(d['times'], times), :] for d in dicts], axis=0) res = {'sim_mat': sim_mat, 'velchange_values': d['velchange_values'], 'times': np.array(times), 'tw': d['tw'], 'attrs': d['attrs']} res['attrs']['channel1'] = '???' res['attrs']['channel2'] = '???' _update_result(res) return res
def _stretch_helper(data, refdata, mask, stretch_factor): N1, N2 = data.shape N3 = len(stretch_factor) assert N2 == len(refdata) # create a spline object for the reference trace # and evaluate the spline object at stretched times time_index_max = (N2 - 1) / 2 time_idx = np.linspace(-time_index_max, time_index_max, N2) ref_tr_spline = InterpolatedUnivariateSpline(time_idx, refdata) ref_stretch = np.empty((N3, N2)) for i in range(N3): ref_stretch[i, :] = ref_tr_spline(time_idx * stretch_factor[i]) # correlate, normalize and return cc = (data * mask) @ np.transpose(ref_stretch * mask) sq1 = np.sum((data * mask) ** 2, axis=1) sq2 = np.sum((ref_stretch * mask) ** 2, axis=1) norm = (sq1[:, np.newaxis] * sq2) ** 0.5 sim_mat = cc / norm sim_mat[np.isnan(sim_mat)] = 0 assert sim_mat.shape == (N1, N3) return sim_mat
[docs]def stretch(stream, max_stretch, num_stretch, tw, tw_relative=None, reftr=None, sides='both', max_lag=None, time_period=None ): """ Stretch traces in stream and return dictionary with results See e.g. Richter et al. (2015) for a description of the procedure. :param stream: |Stream| object with correlations :param float max_stretch: stretching range in percent :param int num_stretch: number of values in stretching vector :param tw: definition of the time window in the correlation -- tuple of length 2 with start and end time in seconds (positive) :param tw_relative: time windows can be defined relative to a velocity, default None or 0 -- time windows relative to zero lag time, otherwise velocity is given in km/s :param reftr: reference trace, by default the stack of stream is used as reference :param sides: one of left, right, both :param max_lag: max lag time in seconds, stream is trimmed to ``(-max_lag, max_lag)`` before stretching :param time_period: use correlations only from this time span (tuple of dates) """ if len(stream) <= 1: log.warning('For stretch the stream must have a minimum length of 2') return ids = {_corr_id(tr) for tr in stream} if None in ids: ids.discard(None) stream.traces = [tr for tr in stream if _corr_id(tr) is not None] if len(ids) != 1: warn('Different ids in stream: %s' % ids) _trim_time_period(stream, time_period) stream.sort() tr0 = stream[0] if max_lag is not None: for tr in stream: _trim(tr, (-max_lag, max_lag)) rel = 0 if tw_relative is None else tr0.stats.dist / 1000 / tw_relative twabs = rel + np.array(tw) starttime = tr0.stats.starttime mid = starttime + (tr0.stats.endtime - starttime) / 2 times = tr0.times(reftime=mid) data = np.array([tr.data for tr in stream]) data[np.isnan(data)] = 0 # bug fix data[np.isinf(data)] = 0 if reftr is None: reftr = yam.stack.stack(stream)[0] stretch_vector_perc = np.linspace(-max_stretch, max_stretch, num_stretch) stretch_factor = 1 + stretch_vector_perc / 100 # MIIC approximation: #stretch_factor = np.exp(stretch_vector_percent / 100) mask1 = np.logical_and(times >= twabs[0], times <= twabs[1]) mask2 = np.logical_and(times <= -twabs[0], times >= -twabs[1]) if sides == 'left': mask = mask1 elif sides == 'right': mask = mask2 elif sides == 'both': mask = np.logical_or(mask1, mask2) else: raise ValueError('sides = %s not a valid option' % sides) sim_mat = _stretch_helper(data, reftr.data, mask, stretch_factor) times = np.array([str(tr.stats.starttime)[:19] for tr in stream], dtype='S19') result = {'sim_mat': sim_mat, 'velchange_values': stretch_vector_perc, 'times': times, 'tw': twabs, 'attrs': {'num_stretch': num_stretch, 'max_stretch': max_stretch, 'sides': sides, 'starttime': stream[0].stats.starttime, 'endtime': stream[-1].stats.starttime } } _update_result(result) for k in ('network1', 'network2', 'station1', 'station2', 'location1', 'location2', 'channel1', 'channel2', 'sampling_rate', 'dist', 'azi', 'baz'): if k in tr0.stats: result['attrs'][k] = tr0.stats[k] return result