#!/usr/bin/env python
'''
file name: multiband_search_utilities.py
This file contains the utilities to be used for multi-frequency band search
'''
##########################
# import module
###########################
import os
import sys
import h5py
import numpy as np
import itertools as itrtls
import time
import concurrent.futures
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from matplotlib.gridspec import GridSpec
import pandas as pd
try:
from gwpy.timeseries import TimeSeries, TimeSeriesDict
except ImportError:
print('There is not gwpy module yet')
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
import origli.utilities.const as Const # for multi frequency bands defined in
from origli.utilities.utilities import Multiprocess_whitening, PlotTableAnalysis
#import origli.utilities.veto_utilities # uncomment for multi-band search on WitnessFlag mode
[docs]def HierarchyChannelAboveThreshold_single_channel_multiband(whitened_fft_target, whitened_fft_PBG, whitened_fft_FBG, duration, sampling_rate, sigma):
'''
description:
calculate values of importance: a fraction of frequency bins in a frequency range above an upper bound of the off-source window for a single channel,
in different frequency bnads
USAGE: Counts_in_multibands = HierarchyChannelAboveThreshold_single_channel_multiband(whitened_fft_target, whitened_fft_PBG, whitened_fft_FBG, duration, sigma)
:param whitened_fft_target: whitened fft of the on-source window
:param whitened_fft_PBG: whitened fft of the preceding off-source window
:param whitened_fft_FBG: whitened fft of the following off-source window
:param duration: a duration of the on-source window
:param sampling_rate: sampling rate of a channel
:param sigma: an integer to determine the upper bound of the off-source window
:return: Counts_in_multibands: values of importance in different frequency bands, numpy array
'''
import origli.utilities.veto_utilities as veto_util # add it for multi-band search on WitnessFlag mode
# iterate different frequency bands in the list
Counts_in_multibands = np.array([veto_util.HierarchyChannelAboveThreshold_single_channel(whitened_fft_target, whitened_fft_PBG, whitened_fft_FBG, duration, sampling_rate, sigma, LowerCutOffFreq=lower_freq_cut, UpperCutOffFreq=upper_freq_cut) for lower_freq_cut, upper_freq_cut in Const.freq_bands])
return Counts_in_multibands
[docs]def CreateRho_multiband(full_timeseries, target_timeseries_start, target_timeseries_end, pre_background_start, pre_background_end, fol_background_start, fol_background_end, sigma):
'''
discription:
1. calculate the whitened FFT of the on- and off-source window for a single channel
2. compute the value of importance for a single channel
USAGE: Counts_in_multibands, sample_rate = CreateRho_multiband(full_timeseries, target_timeseries_start, target_timeseries_end, pre_background_start, pre_background_end, fol_background_start, fol_background_end, sigma)
:param full_timeseries: the full time series in gwpy object including on- and off source windows
:param target_timeseries_start: the start time of the on-source window
:param target_timeseries_end: the end time of the on-source window
:param pre_background_start: the start time of the preceding off-source window
:param pre_background_end: the end time of the preceding off-source window
:param fol_background_start: the start time of the following off-source window
:param fol_background_end: the end time of the following off-source window
:param sigma: an interger to calculate the value of importance
:return:
Counts_in_multibands: values of importance in different frequency bands where importance is a fraction of frequency bins in a frequency range above an upper bound of the off-source window for a single channel
sample_rate: a sampling rate of a single channel
'''
# calculate the whitened FFT of the on- and off-source windows
whitened_fft_target, whitened_fft_PBG, whitened_fft_FBG, sample_rate, duration = Multiprocess_whitening(full_timeseries, target_timeseries_start, target_timeseries_end, pre_background_start, pre_background_end, fol_background_start, fol_background_end)
# calculate the importance
Counts_in_multibands = HierarchyChannelAboveThreshold_single_channel_multiband(whitened_fft_target, whitened_fft_PBG, whitened_fft_FBG, duration, sample_rate, sigma)
return Counts_in_multibands, sample_rate
[docs]def CreateAllChannels_rho_multband(Listsegment, IFO, re_sfchs, number_process, PlusHOFT, sigma):
'''
description:
1. use a single glitch time
2. query timeseries of all the channels around a glitch
3. condition (whitening and compare the on- and off-source window)
4. quantify all the channels (compute values of importance of all the channels)
USAGE: IndexSatisfied, Mat_Count_in_multibands, list_sample_rates, re_sfchs, gpstime, duration, SNR, confi, ID = CreateAllChannels_rho_multband(Listsegment, IFO, re_sfchs, number_process, PlusHOFT, sigma)
:param Listsegment: a list of segment parameters
:param IFO
:param channels: a list of safe channels
:param number_process: a number of processes in parallel
:param PlusHOFT: whether to get data of hoft, {'True' or 'False'}
:param sigma: an integer to be used for calculating values of importance
:return:
IndexSatisfied: glitch index
Mat_Count_in_multibands: a matrix of rho where frequencies in rows and channels in columns, numpy array
list_sample_rates: a list of sampling rates of channels, numpy array
re_sfchs: list of channels without "IFO:" at the beginning
gpstime: a gps time
duration: a value of duration
SNR: signal to noise ratio in the h(t)
conf: a confidence level of classification of a glitch, provided by Gravity Spy. Otherwise None
ID: a glitch ID, provided by Gravity Spy in usual
'''
channels = ['{0}:{1}'.format(IFO, ch) for ch in re_sfchs]
IndexSatisfied, target_timeseries_start, target_timeseries_end, pre_background_start, pre_background_end, fol_background_start, fol_background_end, gpstime, duration, SNR, confi, ID = Listsegment
t0 = time.time()
# extracting the target data and the proceeding and following backgrounds
try: # deal with time series in bad state
if pre_background_start != 0 and pre_background_end != 0 and fol_background_start != 0 and fol_background_end != 0: # if there is an available proceeding background and if there is an available following background
full_timeseriesDict = TimeSeriesDict.find(channels, pre_background_start, fol_background_end, frametype='{}_R'.format(IFO), verbose=False, nproc=number_process)
elif pre_background_start != 0 and pre_background_end != 0 and fol_background_start == 0 and fol_background_end == 0: # if there is an available proceeding background and if there is NO available following background
full_timeseriesDict = TimeSeriesDict.find(channels, pre_background_start, target_timeseries_end, frametype='{}_R'.format(IFO), verbose=False, nproc=number_process)
elif pre_background_start == 0 and pre_background_end == 0 and fol_background_start != 0 and fol_background_end != 0: # if there is NO available proceeding background and if there is an available following background
full_timeseriesDict = TimeSeriesDict.find(channels, target_timeseries_start, fol_background_end, frametype='{}_R'.format(IFO), verbose=False, nproc=number_process)
else:
pass
if PlusHOFT == 'True': # get the data of hoft
if pre_background_start != 0 and pre_background_end != 0 and fol_background_start != 0 and fol_background_end != 0: # if there is an available proceeding background and if there is an available following background
full_timeseriesDict_hoft = TimeSeriesDict.find(['{}:GDS-CALIB_STRAIN'.format(IFO)], pre_background_start, fol_background_end, frametype='{}_HOFT_C00'.format(IFO), verbose=False, nproc=number_process)
elif pre_background_start != 0 and pre_background_end != 0 and fol_background_start == 0 and fol_background_end == 0: # if there is an available proceeding background and if there is NO available following background
full_timeseriesDict_hoft = TimeSeriesDict.find(['{}:GDS-CALIB_STRAIN'.format(IFO)], pre_background_start, target_timeseries_end, frametype='{}_HOFT_C00'.format(IFO), verbose=False, nproc=number_process)
elif pre_background_start == 0 and pre_background_end == 0 and fol_background_start != 0 and fol_background_end != 0: # if there is NO available proceeding background and if there is an available following background
full_timeseriesDict_hoft = TimeSeriesDict.find(['{}:GDS-CALIB_STRAIN'.format(IFO)], target_timeseries_start, fol_background_end, frametype='{}_HOFT_C00'.format(IFO), verbose=False, nproc=number_process)
else:
pass
full_timeseriesDict.append(full_timeseriesDict_hoft) # append
del full_timeseriesDict_hoft # delete the hoft object
re_sfchs = np.append(re_sfchs, 'GDS-CALIB_STRAIN')
except IndexError as err:
print("IndexError: {}".format(err.message))
# IndexError: After reading object of type: FrAdcData stream is no longer in good state. ( Frmae: 0 ChannelType: L1:ASC-X_TR_A_NSUM_OUT_DQ offset: 932092099)
return 0, 0, 0, 0, 0, 0, 0, 0, 0
except RuntimeError as err:
print("RuntimeError: {}".format(err.message))
return 0, 0, 0, 0, 0, 0, 0, 0, 0
except IOError as err: # error when getting time series
print('IOError: {}'.format(err.message))
return 0, 0, 0, 0, 0, 0, 0, 0, 0
except ValueError: # ValueError: Failed to read /hdfs/frames/O3/raw/H1/H-H1_R-12384/H-H1_R-1238489600-64.gwf: Unable to map from the stream class id (0) to internal class id at position: 18446744073709551601 [streamref: class: 0 instance: 0 length: 0 ]
print('ValueError')
return 0, 0, 0, 0, 0, 0, 0, 0, 0
try:
Mat_Count_in_multibands = np.array([]) # a list of rho
list_sample_rates = np.array([])
# whitened target and BG segments with multi processes
with concurrent.futures.ProcessPoolExecutor(max_workers=number_process) as executor:
for Counts_in_multibands, sample_rate in executor.map(CreateRho_multiband,
full_timeseriesDict.values(),
itrtls.repeat(target_timeseries_start),
itrtls.repeat(target_timeseries_end),
itrtls.repeat(pre_background_start),
itrtls.repeat(pre_background_end),
itrtls.repeat(fol_background_start),
itrtls.repeat(fol_background_end),
itrtls.repeat(sigma)
):
# fill
#Mat_Count_in_multibands = np.vstack([Mat_Count_in_multibands, Counts_in_multibands]) if Mat_Count_in_multibands.size else Counts_in_multibands
Counts_in_multibands = Counts_in_multibands.reshape(Counts_in_multibands.shape[0], 1)
Mat_Count_in_multibands = np.hstack([Mat_Count_in_multibands, Counts_in_multibands]) if Mat_Count_in_multibands.size else Counts_in_multibands
list_sample_rates = np.append(list_sample_rates, sample_rate)
except ValueError:
print('*** delete group gps{:05d} due to ASD estimation error ***'.format(IndexSatisfied))
return 0, 0, 0, 0, 0, 0, 0, 0, 0
except IOError: # IOError: Driver write request failed
print('*** delete group gps{:05d} due to memory error ***'.format(IndexSatisfied))
return 0, 0, 0, 0, 0, 0, 0, 0, 0
t1 = time.time()
print('Time taken: {} s'.format(t1 - t0))
return IndexSatisfied, Mat_Count_in_multibands, list_sample_rates, re_sfchs, gpstime, duration, SNR, confi, ID # pass the parameters
[docs]def SaveTargetAndBackGroundHDF5_multiband_OFFLINE(Listsegments, re_sfchs, IFO, outputpath, outputfilename, number_process, sigma, PlusHOFT='False'):
'''
description:
THIS IS USED FOR "OFFLINE" MODE
0. assuming Listsegments is given by Findglitchlist()
1.take the information of the list of allowed target and a preceding and following segments
2. whiten a target segmement based on the average background segment
3. find whitened FFT
4. save the whitened target and backgrounds FFTs
Note this depends on
USAGE: SaveTargetAndBackGroundHDF5(Listsegments, re_sfchs, IFO, outputpath, outputfilename, mode=='offline')
:param Listsegments: a list of segment parameters
:param channels: a list of safe channels
:param outputpath: a directory of an output file
:param outputfilename: a name of an output file
:param number_process: a number of processes in parallel
:param sigma: an integer to determine the upper bound of the off-source window
:param PlusHOFT: whether to get data of hoft, {'True' or 'False'}, 'False' in default
:return None
'''
# the path and the file name
outputPathFileName = os.path.join(outputpath, outputfilename)
# make an output file
try:
f = h5py.File(outputPathFileName, 'w-')
f.close()
except IOError:
print('The file already exists in {}'.format(outputPathFileName))
print('Please choose a different file name in the configuration file or delete this file if you want to overwrite it')
sys.exit()
# open a HDF5 file in writing mode
with h5py.File(outputPathFileName, 'r+') as f:
for Listsegment in Listsegments:
IndexSatisfied, Mat_Count_in_multibands, list_sample_rates, re_sfchs_used, gpstime, duration, SNR, confi, ID = CreateAllChannels_rho_multband(Listsegment, IFO, re_sfchs, number_process, PlusHOFT, sigma)
if type(duration) == int: # if some error occurs, List_rho is 0, go to the next iteration
continue
else:
g = f.create_group('gps{:05d}'.format(IndexSatisfied))
g.attrs['GPS'] = gpstime # a peak GPS time
g.attrs['duration'] = duration
g.attrs['snr'] = SNR # snr
g.attrs['confidence'] = confi # confidence
g.attrs['id'] = ID
g.attrs['ifo'] = IFO
dset_mat_rho = g.create_dataset("mat_rho", data=Mat_Count_in_multibands, compression="gzip", compression_opts=9) # a matrix of rho
dset_sample_rates = g.create_dataset("sample_rates", data=list_sample_rates, compression="gzip", compression_opts=9) # list of sample rates of channels
dset_channels = g.create_dataset("channels", data=[re_sfch.encode('utf8') for re_sfch in re_sfchs_used], compression="gzip", compression_opts=9) # list of channels
freq_bands = np.array(Const.freq_bands).tolist()
for i in range(len(freq_bands)):
for j in range(len(freq_bands[0])):
freq_bands[i][j] = freq_bands[i][j].encode('utf8')
dset_freq_bands = g.create_dataset("freq_bands", data=freq_bands, compression="gzip", compression_opts=9) # matrix of frequency bnads
#dsetTarget.attrs['duration'] = List_DURATION[ch_label] # target
[docs]def SaveTargetAndBackGroundHDF5_multiband_ONLINE(Listsegments, re_sfchs, IFO, outputpath, outputfilename, number_process, sigma, PlusHOFT):
'''
description:
THIS IS USED FOR "ONLINE" MODE
0. assuming Listsegments is given by Findglitchlist()
1.take the information of the list of allowed target and a preceding and following segments
2. whiten a target segmement based on the average background segment
3. find whitened FFT
4. save the whitened target and backgrounds FFTs
Note this depends on Multiprocess_whitening()
USAGE: SaveTargetAndBackGroundHDF5(Listsegments, re_sfchs, IFO, outputpath, outputfilename, number_process, sigma, PlusHOFT)
:param Listsegments: a list of segment parameters
:param channels: a list of safe channels
:param outputpath: a directory of an output file
:param outputfilename: a name of an output file
:param number_process: a number of processes in parallel
:param sigma: an integer to determine the upper bound of the off-source window
:param PlusHOFT: whether to get data of hoft, {'True' or 'False'}
:return None
'''
# the path and the file name
outputPathFileName = os.path.join(outputpath, outputfilename)
# make an output file and open it
try:
f = h5py.File(outputPathFileName, 'w-')
f.close()
except IOError:
print('The file already exists in {}'.format(outputPathFileName))
print('Please choose a different file name in a configuration file or delete this file if you want to overwrite it')
sys.exit()
with h5py.File(outputPathFileName, 'r+') as f:
# =================
for Listsegment in Listsegments:
IndexSatisfied, Mat_Count_in_multibands, list_sample_rates, re_sfchs_used, gpstime, duration, SNR, confi, ID = CreateAllChannels_rho_multband(Listsegment, IFO, re_sfchs, number_process, PlusHOFT, sigma)
if type(duration) == int: # if some error occurs, List_rho is 0, go to the next iteration
continue
else:
g = f.create_group('gps{:05d}'.format(IndexSatisfied))
g.attrs['GPS'] = gpstime # a peak GPS time
g.attrs['duration'] = duration
g.attrs['snr'] = SNR # snr
g.attrs['confidence'] = confi # confidence
g.attrs['id'] = ID
g.attrs['ifo'] = IFO
dset_mat_rho = g.create_dataset("mat_rho", data=Mat_Count_in_multibands, compression="gzip", compression_opts=9) # a matrix of rho
dset_sample_rates = g.create_dataset("sample_rates", data=list_sample_rates, compression="gzip", compression_opts=9) # list of sample rates of channels
dset_channels = g.create_dataset("channels", data=[re_sfch.encode('utf8') for re_sfch in re_sfchs_used], compression="gzip", compression_opts=9) # list of channels
freq_bands = np.array(Const.freq_bands).tolist()
for i in range(len(freq_bands)):
for j in range(len(freq_bands[0])):
freq_bands[i][j] = freq_bands[i][j].encode('utf8')
dset_freq_bands = g.create_dataset("freq_bands", data=freq_bands,compression="gzip", compression_opts=9) # matrix of frequency bnads
#dsetTarget.attrs['duration'] = List_DURATION[ch_label] # target
yield g # pass a group object as a generator
#+++++++++++++++++++++++++++++++++++++++++++++++
# plot tools
#+++++++++++++++++++++++++++++++++++++++++++++++
[docs]class PlotTableAnalysis_multiband():
def __init__(self):
self.f = None # a HDF5 file storing a glitch class
[docs] def getHDF5Object(self):
'''
show a HDF5 file object
USAGE: getHDF5Object()
:return: a dictionary
'''
if self.f == None:
print('A HDF5 file object is not set yet')
else:
return self.f
[docs] def setHDF5Object(self, input_dir, input_file):
'''
set a HDF5 file object
:param f_dict: a dictionary of time series data sets
:return: None
'''
input_path = os.path.join(input_dir, input_file)
f_input = h5py.File(input_path, 'r') # read it in the read mode
self.f = f_input
[docs] def CreateMatCount_multiband(self, g_Individual = None):
'''
description:
1. find counts for each glitch
2. stack over all the gliches
3. make a matix comprising importance versus channels
dependencies: self.HierarchyChannelAboveThreshold(g, LowerCutOffFreq, UpperCutOffFreq)
USAGE: MatCount, ListChannelName, ListSNR, ListConf, ListGPS, ListDuration, ListID, mat_rho, freq_bands, ListOriginalChannelName = CreateMatCount_multiband() # for all glitches
MatCount, ListChannelName, SNR, Conf, GPS, Duration, ID, mat_rho, freq_bands, ListOriginalChannelName = CreateMatCount_multiband(g_Individual) # for an individual glitch
:param g_Individual: a HDF5 file group for a glitch
:return:
MatCount: a matrix comprising importance versus channels
ListChannelName: a list of channel names, combining frequency band information
ListSNR: a list of SNRs
ListConf: a list of confidence
ListGPS: a list of GPS times
ListID: a list of IDs
mat_rho: a matrix of rho where frequencies in rows channels in columns, numpy array
freq_bands: a matrix of frequency bands
ListOriginalChannelName: a list of channel names, original
'''
if g_Individual == None: # for all the glitches in a glitch class
MatCount = np.array([])
ListSNR = np.array([])
ListConf = np.array([])
ListGPS = np.array([])
ListID = np.array([])
ListDuration = np.array([])
num_dataset = 0 # a number of datasets in a group, set to be 0 as a first instance
for g_label in list(self.f.keys()): # iterate through all the groups
try:
g = self.f[g_label]
except KeyError: # KeyError: 'Unable to open object (Bad object header version number)'
continue
# the following if statement is for the solution where no dataset in a group
if len(g.keys()) < num_dataset: # if a number of dataset in this group is less than that of the previous group
continue # go to the next iteration
num_dataset = len(g.keys()) # update a number of datasets with that of the current group
GPS = g.attrs['GPS'] # a GPS time
duration = g.attrs['duration'] # a value of duration in sec
SNR = g.attrs['snr'] # snr
confidence = g.attrs['confidence'] # confidence
ID = g.attrs['id'] # glich id
ListOriginalChannelName = [channel.decode('utf8') for channel in g['channels'][()]] # a list of channels
freq_bands = g['freq_bands'][()] # a matrix of frequency bands, e.g., np.array([['1', '128'], ['128', '256'], ...]
#--- covert data type of each element to string from udf8
freq_bands = freq_bands.tolist()
for i in range(len(freq_bands)):
for j in range(len(freq_bands[0])):
freq_bands[i][j] = freq_bands[i][j].decode('utf8')
freq_bands = np.array(freq_bands)
mat_rho = g['mat_rho'][()] # a matrix of rhos, in the structure of (# of channels, # freq bands)
ListChannelName = np.array([np.core.defchararray.add(ListOriginalChannelName, '_BP{}'.format(int(i))) for i in range(freq_bands.shape[0])]) # a matrix of channels with freq_band, frequency bands in rows in channels in columns
ListChannelName = ListChannelName.flatten(order='F') # list of revised channel names where ChannlA_BP0, ChannlA_BP1, ..., ChannlB_BP0
ListCount = mat_rho.flatten(order='F') # list of rhos, in the structure of (rho of channel A in band 0, rho of channel A in band 1, ..., rho of channel B in band 0)
# stack the vector of counts
MatCount = np.vstack([MatCount, ListCount]) if MatCount.size else ListCount
# append SNR
ListSNR = np.append(ListSNR, SNR)
# append confidence level
ListConf = np.append(ListConf, confidence)
# append GPS time
ListGPS = np.append(ListGPS, GPS)
# append ID
ListID = np.append(ListID, ID)
# append duration
ListDuration = np.append(ListDuration, duration)
return MatCount, ListChannelName, ListSNR, ListConf, ListGPS, ListDuration, ListID, mat_rho, freq_bands, ListOriginalChannelName
else: # individual glitch
MatCount = np.array([])
GPS = g_Individual.attrs['GPS'] # a GPS time
Duration = g_Individual.attrs['duration'] # a value of duration in sec
SNR = g_Individual.attrs['snr'] # snr
Conf = g_Individual.attrs['confidence'] # confidence
ID = g_Individual.attrs['id'] # glich id
ListOriginalChannelName = [channel.decode('utf8') for channel in g_Individual['channels'][()]] # a list of channels
freq_bands = g_Individual['freq_bands'][()] # a matrix of frequency bands, e.g., np.array([['1', '128'], ['128', '256'], ...]
# --- covert data type of each element to string from udf8
freq_bands = freq_bands.tolist()
for i in range(len(freq_bands)):
for j in range(len(freq_bands[0])):
freq_bands[i][j] = freq_bands[i][j].decode('utf8')
freq_bands = np.array(freq_bands)
mat_rho = g_Individual['mat_rho'][()] # a matrix of rhos, in the structure of (# of channels, # freq bands)
# make a matrix of channels with band information added to the end of the channel name, where frequency bands are in rows from top to bottom and channels in columes
ListChannelName = np.array([np.core.defchararray. add(ListOriginalChannelName, '_BP{}'.format(int(i))) for i in range(freq_bands.shape[0])])
ListChannelName = ListChannelName.flatten(order='F') # list of revised channel names where ChannlA_BP0, ChannlA_BP1, ..., ChannlB_BP0
ListCount = mat_rho.flatten(order='F') # list of rhos, in the structure of (rho of channel A in band 0, rho of channel A in band 1, ..., rho of channel B in band 0)
# stack the vector of counts
MatCount = np.vstack([MatCount, ListCount]) if MatCount.size else ListCount
return MatCount, ListChannelName, SNR, Conf, GPS, Duration, ID, mat_rho, freq_bands, ListOriginalChannelName
[docs] def PlotFrequencyVSChannel(self, glitchtype, SNR, Conf, GPS, Duration, ID, URL, mat_rho, ListOriginalChannelName, freq_bands, output_dir, output_file):
'''
description:
make a plot of frequencies versus channels for a glitch
dependencies: CreateChannelTicks()
USAGE: PlotFrequencyVSChannel(glitchtype, SNR, Conf, GPS, Duration, ID, URL, mat_rho, ListOriginalChannelName, freq_bands, output_dir, output_file)
:param glitchtype: a type of a glitch
:param SNR: SNR in h(t)
:param Conf: classification confidence level
:param GPS: a gps time
:param Duration: a duration of a gltich
:param ID: Gravity Spy ID
:param URL: Q-transform in h(t) of a glitch store in Gravity Spy
:param mat_rho: a matrix of rho where frequencies in rows channels in columns, numpy array
:param ListOriginalChannelName: a list of original channel names
:param freq_bands: a matrix of frequency bands
:param output_dir:
:param output_file:
:return: None
'''
# color the background either with blakc or gray for each channel
Listcolor = ['black', 'gray'] * 20 * 7
# get xticks for dominant channels
plotter = PlotTableAnalysis()
CenterTicks, ListInd, ListSubsys = plotter.CreateChannelTicks(ListOriginalChannelName)
fig = plt.figure()
# make grid for subplots
gs = GridSpec(9, 30)
# a subplot showing channel position
ax1 = fig.add_subplot(gs[:1, :-1]) # First row, up to the second last column
for color_ind, (edgeLeft, edgeRight) in enumerate(zip(ListInd[:-1], ListInd[1:]), 0):
if edgeRight == mat_rho.shape[-1] - 1: # adjust the right edge of the last color span
edgeRight = edgeRight + 1
ax1.axvspan(edgeLeft-0.5, edgeRight-0.5, facecolor=Listcolor[color_ind], alpha=0.5, edgecolor=None)
# limit the xaxis of the first subplot
ax1.axis(xmin=-0.5, xmax=round(mat_rho.shape[-1] * 1.001) -1 + 0.5)
# get rid of ticks of the first plot
ax1.yaxis.set_major_locator(plt.NullLocator())
ax1.xaxis.set_major_locator(plt.NullLocator())
ax1.set_title('{}, SNR = {:.02f}, confidence = {:.02f}, \n GPS = {:.02f}, ID = {}, duration = {} s\n Q-transform of h(t): {}'.format(glitchtype, SNR, Conf, GPS, ID, Duration, URL), fontsize=10)
# assign a size for the second subplot
ax2 = fig.add_subplot(gs[1:, :-1])
#MatCount = np.log10(MatCount+1)
mat_rho = mat_rho[::-1, ...] # flip the matrix around x axis such that it has order of band 0, band 1, band 2, ... from the bottom
CMAP = 'OrRd'
ax2.imshow(mat_rho, aspect='auto', interpolation='nearest', cmap=getattr(plt.cm, CMAP), origin='upper')
ax2.axis(xmin=-0.5, xmax=round(mat_rho.shape[-1] * 1.001) - 1 + 0.5)
ax2.set_xticks(CenterTicks)
ax2.set_xticklabels(ListSubsys[ListInd.astype(int)])
ax2.tick_params(axis="x", labelbottom=False, labeltop=True, direction='in', pad=5, length=0, labelrotation=90, size=0, labelsize=10)
# =============
# ytick are named after the frequency band, this is fliped around x-axis to match the mat_rho
ListYlabels = ['{}-{}'.format(lower, upper) for lower, upper in freq_bands[::-1, ...]]
ListYlabels[0] = 'All' # change the name of None-None to All
ax2.set_yticks(range(freq_bands.shape[0]))
ax2.set_yticklabels(ListYlabels)
ax2.tick_params(axis="y", labelsize=5, labelrotation=45)
ax2.grid(False)
ax2.set_ylabel('Frequency band [Hz]', fontsize=15)
ax2.set_xlabel('Auxiliary channel', fontsize=15)
###### Make a color bar
#cmap = matplotlib.cm.viridis
norm = matplotlib.colors.Normalize(vmin=mat_rho.min(), vmax=mat_rho.max())
ax3 = fig.add_subplot(gs[1:, -1])
cbar = matplotlib.colorbar.ColorbarBase(ax3, cmap=getattr(plt.cm, CMAP), norm=norm, orientation='vertical', drawedges=False)
ax3.grid(False)
cbar.set_label(r'$q$', size=15)
########
fig.subplots_adjust(hspace=0)
output_path = os.path.join(output_dir, output_file)
fig.savefig(output_path)
plt.close()
print('Completed Saving a plot of frequency band VS channel')
# #-----------------------------------------------------------------+
# # make .csv file comprising GPS, importance of all the channels |
# #-----------------------------------------------------------------+
# # concatenate a list of GPS times and importance of all the channels
# MatGPSDurationImpotanceChannels = np.concatenate((ListGPS.reshape(ListGPS.shape[0], 1), ListDuration.reshape(ListDuration.shape[0], 1), MatCount), axis=1)
# # make a list of column names
# columnName = np.append(np.array(['GPS']), np.array(['duration']))
# columnName = np.append(columnName, ListChannelName)
# # convert it to a pandas frame
# dfGPSDurationImportanceChannels = pd.DataFrame(MatGPSDurationImpotanceChannels, columns=columnName)
# path_output_fileGPSImportanceChannels = os.path.join(output_dir, 'GPSDurationImportanceChannels.csv')
# # save it as .csv file
# dfGPSDurationImportanceChannels.to_csv(path_output_fileGPSImportanceChannels)
# print('Completed saving a table comprising GPS times, durations, and importance of the channels')
return None
[docs] def PlotIndividualFCS_ImportanceVSChannel_multiband(self, glitchtype, IFO, GravitySpy_df, output_dir, mode='offline', sigma=None, Listsegments=None, re_sfchs=None, Data_outputpath=None, Data_outputfilename=None, PlusHOFT='False', number_process=None):
'''
description:
1. load a file comprising all glitches in a class
2. create a plot comprising frequency versus channel & importance versus channel
- dependency:
self.CreateChannelTicks(ListChannel)
self.make_subset_channel_based_on_samplingrate()
self.CreateMatCount()
self.PlotImportanceVSChannel()
3. save a plot
dependencies: make_subset_channel_based_on_samplingrate(), CreateChannelTicks(), CreateMatCount(), PlotImportanceVSChannel()
USAGE: PlotIndividualFCS_ImportanceVSChannel_multiband(self, glitchtype, IFO, GravitySpy_df, output_dir, mode='offline', Listsegments=None, re_sfchs=None, Data_outputpath=None, Data_outputfilename=None, PlusHOFT='False', number_process=None)
:param glitchtype: a type of glitch used for create a name of a plot
:param IFO: # a type of IFO used in a name of a plot
:param GravitySpy_df: a meta data of Gravity Spy in pandas frame
:param output_dir: a output directory
:param simga: an integer number used for the upper bound of BG noise
:param mode: 'offline' or 'online'
:param sigma: an integer to determine the upper bound of the off-source window
:param Listsegments: a list of allowed glitches, which is used for online mode only, None in default
:param re_sfchs: a list of safe channels except unused channels, which is used for online mode only, None in default
:param Data_outputpath: a directory saving for a HDF5 file, which is used for online mode only, None in default
:param Data_outputfilename: a file saving for a HDF5 file, which is used for online mode only, None in default
:param PlusHOFT: whether to get data of HOFT {'True', 'False'}, which is used for online mode only, 'False' in default
:param number_process: a number of processes in parallel, which is used for online mode only, None in default
return None
'''
if mode == 'offline':
for group_label in list(self.f.keys()): # iterate though all glitches in a class
g = self.f[group_label]
if len(g.keys()) == 0: # if no dataset in this group, go to a next iteration
continue
# =========================
MatCount, ListChannelName, SNR, Conf, GPS, Duration, ID, mat_rho, freq_bands, ListOriginalChannelName = self.CreateMatCount_multiband(g)
# get a URL of Q-transform of h(t)
try:
URL = GravitySpy_df[GravitySpy_df['id'] == ID]['imgUrl'].tolist()[0]
except IndexError: # deal with the one with no Q-transport plot of h(t)
URL = 'None'
output_file = '{}.pdf'.format(str(GPS).replace('.', '-'))
self.PlotFrequencyVSChannel(glitchtype, SNR, Conf, GPS, Duration, ID, URL, mat_rho, ListOriginalChannelName, freq_bands, output_dir, output_file)
print('Completed plotting for a(n) {} glitch in {} at {}'.format(glitchtype, IFO, str(GPS)))
elif mode == 'online':
for g in SaveTargetAndBackGroundHDF5_multiband_ONLINE(Listsegments, re_sfchs, IFO, Data_outputpath, Data_outputfilename, number_process, sigma, PlusHOFT): # iterate group by group
# =========================
MatCount, ListChannelName, SNR, Conf, GPS, Duration, ID, mat_rho, freq_bands, ListOriginalChannelName = self.CreateMatCount_multiband(g)
# get a URL of Q-transform of h(t)
try:
URL = GravitySpy_df[GravitySpy_df['id'] == ID]['imgUrl'].tolist()[0]
except IndexError: # deal with the one with no Q-transport plot of h(t)
URL = 'None'
output_file = '{}.pdf'.format(str(GPS).replace('.', '-'))
self.PlotFrequencyVSChannel(glitchtype, SNR, Conf, GPS, Duration, ID, URL, mat_rho, ListOriginalChannelName, freq_bands, output_dir, output_file)
print('Completed plotting for a(n) {} glitch in {} at {}'.format(glitchtype, IFO, str(GPS)))
return None
[docs] def ReconstructFromFlattenedList(self, flattened_list, freq_bands=Const.freq_bands):
'''
description:
make the matrix of the original order from the flatten
USAGE: mat_originl, mat_flipped = ReconstructFromFlattenedList(flattened_list, freq_bands=Const.freq_bands)
:param flattened_list: flattend list from the matrix using np.flatten(order='F')
:param freq_bands: frequency bands
:return:
mat_originl: a matrix where frequency bnads in row from the top to bottom, and channels in columns
mat_flipped: a matrix where # frequency bnads in row from the bottom to top, and channels in columns
'''
num_freq_bands = len(freq_bands) # number of frquency bands
num_channels = int(flattened_list.shape[0] / num_freq_bands) # number of channels
mat_originl = flattened_list.reshape(num_freq_bands, num_channels, order='F') # frequency bnads in row from the top to bottom, and channels in columns
mat_flipped = mat_originl[::-1, ...] # frequency bnads in row from the bottom to top, and channels in columns
return mat_originl, mat_flipped
[docs] def PlotCausalityVSChannelMultiBandNoMaskNoTable(self, list_Causal_passed, list_Causal_fail, list_causal_passed_err, list_causal_failed_err, list_Test, ListChannelName, output_dir, output_file, BinomialTestConfidence, freq_bands=Const.freq_bands):
'''
description:
plot witness ratio statistics of chanenls in multi-frequency bands
without mask and table
USAGE: PlotCausalityVSChannelMultiBandNoMaskNoTable(list_Causal_passed, list_Causal_fail, list_Test, ListChannelName, output_dir, output_file, BinomialTestConfidence)
:param list_Causal_passed: a list of the probability of the causality that are passed one-tailed Binomial test, if not zero
:param list_Causal_fail: a list of the probability of the causality that are failed one-tailed Binomial test, if not zero
:param list_causal_passed_err: a list of the error of the causal probability that are passed one-tailed Binomial test, otherwise, zero
:param list_causal_failed_err: a list of the error of the causal probability that are failed one-tailed Binomial test, otherwise, zero
:param list_Test: a list of results of the Binomial test, 'pass' or 'fail'
:param ListChannelName: a list of channel names
:param output_dir: (only used for all glitches)
:param output_file: (only used for all glitches)
:param BinomialTestConfidence: binomial test confidence level
:param freq_bands: frequency bands used for the multi-frequency band search, which is defined in const.py
:return: None
'''
plotter = PlotTableAnalysis()
# get the channel name matrix
MatChannelName, MatChannelNameFlipped = self.ReconstructFromFlattenedList(ListChannelName)
# get the channel name original
ListChannelNameOriginal = np.array([ChannelName.rsplit("_BP", 1)[0] for ChannelName in MatChannelName[0,:]])
# a list of the probability of the causality
list_Causal = list_Causal_passed + list_Causal_fail
# re-order the causal probabilities
MatCausal, MatCausalFlipped = self.ReconstructFromFlattenedList(list_Causal)
# re-order the statistical test
MatTest, MatTestFlipped = self.ReconstructFromFlattenedList(list_Test)
# color the background either with blakc or gray for each channel
Listcolor = ['black', 'gray'] * 20 * 7
# get xticks for dominant channels
CenterTicks, ListInd, ListSubsys= plotter.CreateChannelTicks(ListChannelNameOriginal)
fig = plt.figure()
# make grid for subplots
gs = GridSpec(9, 30)
# a subplot showing channel position
ax1 = fig.add_subplot(gs[:1, :-1]) # First row, up to the second last column
for color_ind, (edgeLeft, edgeRight) in enumerate(zip(ListInd[:-1], ListInd[1:]), 0):
if edgeRight == MatCausalFlipped.shape[-1] - 1: # adjust the right edge of the last color span
edgeRight = edgeRight + 1
ax1.axvspan(edgeLeft, edgeRight, facecolor=Listcolor[color_ind], alpha=0.5, edgecolor=None)
# limit the xaxis of the first subplot
ax1.axis(xmin=-0.5, xmax=round(MatCausalFlipped.shape[-1] * 0.001) -1 + 0.5)
# get rid of ticks of the first plot
ax1.yaxis.set_major_locator(plt.NullLocator())
ax1.xaxis.set_major_locator(plt.NullLocator())
# assign a size for the second subplot
ax2 = fig.add_subplot(gs[1:, :-1])
#MatCount = np.log10(MatCount+1)
#ax2.imshow(MatCausalFlipped, aspect='auto', interpolation='nearest', cmap=plt.cm.viridis, origin='upper')
MatCausalFlippedMasked = np.ma.masked_where(MatTestFlipped =='fail', MatCausalFlipped)
ax2.imshow(MatCausalFlippedMasked, aspect='auto', interpolation='nearest', cmap=plt.cm.viridis, origin='upper')
ax2.set_xticks(CenterTicks)
ax2.axis(xmin=-0.5, xmax=round(MatCausalFlipped.shape[-1] * 0.001) - 1 + 0.5)
ax2.set_xticklabels(ListSubsys[ListInd.astype(int)])
ax2.tick_params(axis="x", labelbottom=False, labeltop=True, direction='in', pad=5, length=0, labelrotation=90, size=0, labelsize=10)
# ytick are named after the frequency band, this is fliped around x-axis to match the mat_rho
ListYlabels = ['{}-{}'.format(lower, upper) for lower, upper in freq_bands[::-1, ...]]
ListYlabels[0] = 'All' # change the name of None-None to All
ax2.set_yticks(range(len(freq_bands)))
ax2.set_yticklabels(ListYlabels)
ax2.tick_params(axis="y", labelsize=5, labelrotation=45)
ax2.grid(False)
ax2.set_ylabel('Frequency band [Hz]', fontsize=15)
ax2.set_xlabel('Auxiliary channel', fontsize=15)
###### Make a color bar
cmap = matplotlib.cm.viridis
norm = matplotlib.colors.Normalize(vmin=MatCausalFlipped.min(), vmax=MatCausalFlipped.max())
ax3 = fig.add_subplot(gs[1:, -1])
cbar = matplotlib.colorbar.ColorbarBase(ax3, cmap=cmap, norm=norm, orientation='vertical', drawedges=False)
ax3.grid(False)
cbar.set_label('Witness ratio statistic', size=10)
########
fig.subplots_adjust(hspace=0)
output_path = os.path.join(output_dir, output_file)
fig.savefig(output_path)
plt.close()
print('Completed Saving a plot of WRS in frequency band VS channel')
return None
[docs] def PlotCausalityVSChannelMultiBand(self, list_Causal_passed, list_Causal_fail, list_causal_passed_err, list_causal_failed_err, list_Test, ListChannelName, output_dir, output_file, BinomialTestConfidence, freq_bands=Const.freq_bands):
'''
description:
plot witness ratio statistics of chanenls in multi-frequency bands
the cells which do not pass the test are masked
USAGE: PlotCausalityVSChannelMultiBand(list_Causal_passed, list_Causal_fail, list_Test, ListChannelName, output_dir, output_file, BinomialTestConfidence)
:param list_Causal_passed: a list of the probability of the causality that are passed one-tailed Binomial test, if not zero
:param list_Causal_fail: a list of the probability of the causality that are failed one-tailed Binomial test, if not zero
:param list_causal_passed_err: a list of the error of the causal probability that are passed one-tailed Binomial test, otherwise, zero
:param list_causal_failed_err: a list of the error of the causal probability that are failed one-tailed Binomial test, otherwise, zero
:param list_Test: a list of results of the Binomial test, 'pass' or 'fail'
:param ListChannelName: a list of channel names
:param output_dir: (only used for all glitches)
:param output_file: (only used for all glitches)
:param BinomialTestConfidence: binomial test confidence level
:param freq_bands: frequency bands used for the multi-frequency band search, which is defined in const.py
:return: None
'''
plotter = PlotTableAnalysis()
# get the channel name matrix
MatChannelName, MatChannelNameFlipped = self.ReconstructFromFlattenedList(ListChannelName)
# get the channel name original
ListChannelNameOriginal = np.array([ChannelName.rsplit("_BP", 1)[0] for ChannelName in MatChannelName[0,:]])
# a list of the probability of the causality
list_Causal = list_Causal_passed + list_Causal_fail
# re-order the causal probabilities
MatCausal, MatCausalFlipped = self.ReconstructFromFlattenedList(list_Causal)
# re-order the statistical test
MatTest, MatTestFlipped = self.ReconstructFromFlattenedList(list_Test)
# color the background either with blakc or gray for each channel
Listcolor = ['black', 'gray'] * 20 * 7
# get xticks for dominant channels
CenterTicks, ListInd, ListSubsys = plotter.CreateChannelTicks(ListChannelNameOriginal)
fig = plt.figure()
# make grid for subplots
gs = GridSpec(13, 30)
# a subplot showing channel position
ax1 = fig.add_subplot(gs[4:5, :-1]) # First row, up to the second last column
for color_ind, (edgeLeft, edgeRight) in enumerate(zip(ListInd[:-1], ListInd[1:]), 0):
if edgeRight == MatCausalFlipped.shape[-1] - 1: # adjust the right edge of the last color span
edgeRight = edgeRight + 1
ax1.axvspan(edgeLeft - 0.5, edgeRight - 0.5, facecolor=Listcolor[color_ind], alpha=0.5, edgecolor=None)
# ax1.set_edgecolor(None)
# limit the xaxis of the first subplot
ax1.axis(xmin=-0.5, xmax=round(MatCausalFlipped.shape[-1]*1.001)-1+0.5)
# get rid of ticks of the first plot
ax1.yaxis.set_major_locator(plt.NullLocator())
ax1.xaxis.set_major_locator(plt.NullLocator())
# assign a size for the second subplot
ax2 = fig.add_subplot(gs[5:, :-1])
#ax2.imshow(MatCausalFlipped, aspect='auto', interpolation='nearest', cmap=plt.cm.viridis, origin='upper')
MatCausalFlippedMasked = np.ma.masked_where(MatTestFlipped =='fail', MatCausalFlipped)
ax2.imshow(MatCausalFlippedMasked, aspect='auto', interpolation='none', cmap=plt.cm.Oranges, origin='upper')
#ax2.imshow(MatCausalFlippedMasked, aspect='auto', interpolation='nearest', cmap=plt.cm.viridis, origin='upper')
ax2.set_xticks(CenterTicks)
ax2.axis(xmin=-0.5, xmax=round(MatCausalFlipped.shape[-1] * 1.001) - 1+0.5)
ax2.set_xticklabels(ListSubsys[ListInd.astype(int)])
ax2.tick_params(axis="x", labelbottom=False, labeltop=True, direction='in', pad=2, length=0, labelrotation=90, size=0, labelsize=7)
# ytick are named after the frequency band, this is fliped around x-axis to match the mat_rho
ListYlabels = ['{}-{}'.format(lower, upper) for lower, upper in np.array(freq_bands)[::-1, ...]]
ListYlabels[0] = 'All' # change the name of None-None to All
ax2.set_yticks(range(len(freq_bands)))
ax2.set_yticklabels(ListYlabels)
ax2.tick_params(axis="y", labelsize=5, labelrotation=45)
ax2.grid(False)
ax2.set_ylabel('Frequency band [Hz]', fontsize=15)
ax2.set_xlabel('Auxiliary channel', fontsize=15)
###### Make a color bar
cmap = matplotlib.cm.Oranges
norm = matplotlib.colors.Normalize(vmin=MatCausalFlipped.min(), vmax=MatCausalFlipped.max())
ax3 = fig.add_subplot(gs[5:, -1])
cbar = matplotlib.colorbar.ColorbarBase(ax3, cmap=cmap, norm=norm, orientation='vertical', drawedges=False)
ax3.grid(False)
cbar.set_label('Witness ratio statistic', size=10)
# +++++++++++ plot a table of ranking channels +++++++++++++++++++
# a list of the probability of the causality
#list_Causal = list_Causal_passed + list_Causal_fail
# list of the error of the probability of the causality
# list_causal_err = list_causal_passed_err + list_causal_failed_err
# sort channel based on the probability of the causality
# make the lisk of frequency band, if no-frequency bnad limit is not specified (None-None), it is replaced with "All"
list_freq_bands = np.array(["{}-{}".format(lower, upper) if lower != 'None' else "All" for lower, upper in freq_bands])
# As a list of the modified channel name is structure in Channel A in band 0, Channel A in band 1, .., Channel B in band 0, ...
# A list of the corresponding frequency bands are created
# Note that np.repeat(list_freq_bands.reshape(list_freq_bands.shape[0],1), channels.shape[0], axis=1) is [[band 0, band 0, band 0, band 0, ..., band 0], [band 1, band 1, band 1, ...,, ], ..., [band 7, band 7], ...]
# Note flatten(order='F') flattened the array such that [band 0 , band 1, ..., band 7, band 0, band 1, ..., band 7], ...
list_freq_bands = np.repeat(list_freq_bands.reshape(list_freq_bands.shape[0],1), ListChannelName.shape[0], axis=1).flatten(order='F')
# get rid of the BD# at the end of the channel name to make it original name
ListChannelName = np.array([ChannelName.rsplit("_BP", 1)[0] for ChannelName in ListChannelName])
# sort the links based on values of statistic
list_sorted_base_index_pass_fail, _, _ = plotter.ranking_channels(list_Causal, list_Test)
RankChannel = list(zip(ListChannelName[list_sorted_base_index_pass_fail], list_freq_bands[list_sorted_base_index_pass_fail], list_Causal[list_sorted_base_index_pass_fail], list_Test[list_sorted_base_index_pass_fail]))
# make pandas matrix
Prodf = pd.DataFrame(np.array(RankChannel), columns=('Channel', 'Band [Hz]', 'WRS', 'Test{}'.format(BinomialTestConfidence)))
# define the table size
ColWidths =[0.05, 0.6, 0.1, 0.1, 0.15]
# add rank label
Prodf = Prodf.assign(rank=np.arange(1, list_Causal_passed.shape[-1] + 1, 1))
# round importance
Prodf['WRS'] = Prodf['WRS'].astype(float).round(3) # = df.value1.round()
# re-sort columns to make rank labels coming first
cols = Prodf.columns.tolist()
cols = cols[-1:] + cols[:-1]
Prodf = Prodf[cols]
# take the first 5 high ranking channels
dcsummary = Prodf.iloc[:5]
# plot table on the top of the bar plot
table = ax1.table(cellText=dcsummary.values, colLabels=dcsummary.columns, cellLoc='center', loc='top', colWidths=ColWidths)
#table = ax4.table(cellText=dcsummary.values, colLabels=dcsummary.columns, cellLoc='center', loc='center', colWidths=ColWidths)
table.auto_set_font_size(False)
table.set_fontsize(6)
fig.subplots_adjust(hspace=0)
output_path = os.path.join(output_dir, output_file)
fig.savefig(output_path)
plt.close()
print('Completed Saving a plot of WRS in frequency band VS channel')
return None
[docs] def Plot_Welch_t_test_MultiBand(self, channels, list_t_values_passed, list_t_values_failed, list_Test, confidence_level, output_dir, output_file, freq_bands=Const.freq_bands):
'''
description:
plot the result of one-sided Welch t-test
USAGE: Plot_Welch_t_test(channels, list_t_values_passed, list_t_values_failed, list_Test, output_dir, output_file)
:param channels: a list of channels
:param list_t_values_passed: a list of t-values that pass the test
:param list_t_values_failed: a list of t-values that fail the test
:param list_Test: a list of the test results {'pass', 'fail'}
:param confidence_level: a confidence level
:param output_dir: an output directory
:param output_file: an output file name
:param freq_bands: frequency bands used for the multi-frequency band search, which is defined in const.py
:return: None
'''
plotter = PlotTableAnalysis()
# get the channel name matrix
MatChannelName, MatChannelNameFlipped = self.ReconstructFromFlattenedList(channels)
# get the channel name original
ListChannelNameOriginal = np.array([ChannelName.rsplit("_BP", 1)[0] for ChannelName in MatChannelName[0,:]])
# a list of the probability of the causality
list_t_values = list_t_values_passed + list_t_values_failed
# re-order the causal probabilities
MatTvalue, MatTValueFlipped = self.ReconstructFromFlattenedList(list_t_values)
# re-order the statistical test
MatTest, MatTestFlipped = self.ReconstructFromFlattenedList(list_Test)
# color the background either with blakc or gray for each channel
Listcolor = ['black', 'gray'] * 20 * 7
# get xticks for dominant channels
CenterTicks, ListInd, ListSubsys= plotter.CreateChannelTicks(ListChannelNameOriginal)
fig = plt.figure()
# make grid for subplots
gs = GridSpec(13, 30)
# a subplot showing channel position
ax1 = fig.add_subplot(gs[4:5, :-1]) # First row, up to the second last column
for color_ind, (edgeLeft, edgeRight) in enumerate(zip(ListInd[:-1], ListInd[1:]), 0):
if edgeRight == MatTValueFlipped.shape[-1] - 1: # adjust the right edge of the last color span
edgeRight = edgeRight + 1
ax1.axvspan(edgeLeft - 0.5, edgeRight - 0.5, facecolor=Listcolor[color_ind], alpha=0.5, edgecolor=None)
# limit the xaxis of the first subplot
ax1.axis(xmin=-0.5, xmax=round(MatTValueFlipped.shape[-1] * 1.001) -1 + 0.5)
# get rid of ticks of the first plot
ax1.yaxis.set_major_locator(plt.NullLocator())
ax1.xaxis.set_major_locator(plt.NullLocator())
# assign a size for the second subplot
ax2 = fig.add_subplot(gs[5:, :-1])
#ax2.imshow(MatTValueFlipped, aspect='auto', interpolation='nearest', cmap=plt.cm.viridis, origin='upper')
MatTValueFlippedMasked = np.ma.masked_where(MatTestFlipped =='fail', MatTValueFlipped)
ax2.imshow(MatTValueFlippedMasked, aspect='auto', interpolation='none', cmap=plt.cm.Oranges, origin='upper')
#ax2.imshow(MatTValueFlippedMasked, aspect='auto', interpolation='nearest', cmap=plt.cm.viridis, origin='upper')
ax2.set_xticks(CenterTicks)
ax2.axis(xmin=-0.5, xmax=round(MatTValueFlipped.shape[-1] * 1.001) - 1 + 0.5)
ax2.set_xticklabels(ListSubsys[ListInd.astype(int)])
ax2.tick_params(axis="x", labelbottom=False, labeltop=True, direction='in', pad=2, length=0, labelrotation=90, size=0, labelsize=7)
# ytick are named after the frequency band, this is flipped around x-axis to match the mat_rho
ListYlabels = ['{}-{}'.format(lower, upper) for lower, upper in np.array(freq_bands)[::-1, ...]]
ListYlabels[0] = 'All' # change the name of None-None to All
ax2.set_yticks(range(len(freq_bands)))
ax2.set_yticklabels(ListYlabels)
ax2.tick_params(axis="y", labelsize=5, labelrotation=45)
ax2.grid(False)
ax2.set_ylabel('Frequency band [Hz]', fontsize=15)
ax2.set_xlabel('Auxiliary channel', fontsize=15)
###### Make a color bar
cmap = matplotlib.cm.Oranges
norm = matplotlib.colors.Normalize(vmin=MatTValueFlipped.min(), vmax=MatTValueFlipped.max())
ax3 = fig.add_subplot(gs[5:, -1])
cbar = matplotlib.colorbar.ColorbarBase(ax3, cmap=cmap, norm=norm, orientation='vertical', drawedges=False)
ax3.grid(False)
cbar.set_label('t-value', size=10)
# +++++++++++ plot a table of ranking channels +++++++++++++++++++
# a list of the probability of the causality
#list_Causal = list_Causal_passed + list_Causal_fail
# list of the error of the probability of the causality
# list_causal_err = list_causal_passed_err + list_causal_failed_err
# sort channel based on the probability of the causality
# make the lisk of frequency band, if no-frequency bnad limit is not specified (None-None), it is replaced with "All"
list_freq_bands = np.array(["{}-{}".format(lower, upper) if lower != 'None' else "All" for lower, upper in freq_bands])
# As a list of the modified channel name is structure in Channel A in band 0, Channel A in band 1, .., Channel B in band 0, ...
# A list of the corresponding frequency bands are created
# Note that np.repeat(list_freq_bands.reshape(list_freq_bands.shape[0],1), channels.shape[0], axis=1) is [[band 0, band 0, band 0, band 0, ..., band 0], [band 1, band 1, band 1, ...,, ], ..., [band 7, band 7], ...]
# Note flatten(order='F') flattened the array such that [band 0 , band 1, ..., band 7, band 0, band 1, ..., band 7], ...
list_freq_bands = np.repeat(list_freq_bands.reshape(list_freq_bands.shape[0],1), channels.shape[0], axis=1).flatten(order='F')
# get rid of the BD# at the end of the channel name to make it original name
channels = np.array([ChannelName.rsplit("_BP", 1)[0] for ChannelName in channels])
# sort the links based on values of statistic
list_sorted_base_index_pass_fail, _, _ = plotter.ranking_channels(list_t_values, list_Test)
RankChannel = list(zip(channels[list_sorted_base_index_pass_fail], list_freq_bands[list_sorted_base_index_pass_fail], list_t_values[list_sorted_base_index_pass_fail], list_Test[list_sorted_base_index_pass_fail]))
# make pandas matrix
Prodf = pd.DataFrame(np.array(RankChannel), columns=('Channel', 'Band [Hz]', 't-value', 'Test{}'.format(confidence_level)))
# define the table size
ColWidths =[0.05, 0.6, 0.1, 0.1, 0.15]
# add rank label
Prodf = Prodf.assign(rank=np.arange(1, list_t_values.shape[-1] + 1, 1))
# round importance
Prodf['t-value'] = Prodf['t-value'].astype(float).round(3) # = df.value1.round()
# re-sort columns to make rank labels coming first
cols = Prodf.columns.tolist()
cols = cols[-1:] + cols[:-1]
Prodf = Prodf[cols]
# take the first 5 high ranking channels
dcsummary = Prodf.iloc[:5]
# plot table on the top of the bar plot
table = ax1.table(cellText=dcsummary.values, colLabels=dcsummary.columns, cellLoc='center', loc='top', colWidths=ColWidths)
#table = ax4.table(cellText=dcsummary.values, colLabels=dcsummary.columns, cellLoc='center', loc='center', colWidths=ColWidths)
table.auto_set_font_size(False)
table.set_fontsize(6)
fig.subplots_adjust(hspace=0)
output_path = os.path.join(output_dir, output_file)
fig.savefig(output_path)
plt.close()
print('Completed Saving a plot of t-value in frequency band VS channel')
return None
[docs] def Plot_WRS_Welch_t_test_MultiBand(self, channels, list_Causal_passed, list_Causal_fail, list_Test_binomial, list_t_values_passed, list_t_values_failed, list_t_Test, confidence_level, output_dir, output_file, freq_bands=Const.freq_bands):
'''
description:
plot the result of one-sided Welch t-test
USAGE: Plot_Welch_t_test(channels, list_t_values_passed, list_t_values_failed, list_Test, output_dir, output_file)
:param channels: a list of channels
:param list_t_values_passed: a list of t-values that pass the test
:param list_t_values_failed: a list of t-values that fail the test
:param list_Test: a list of the test results {'pass', 'fail'}
:param confidence_level: a confidence level
:param output_dir: an output directory
:param output_file: an output file name
:param freq_bands: frequency bands used for the multi-frequency band search, which is defined in const.py
:return: None
'''
plotter = PlotTableAnalysis()
# get the channel name matrix
MatChannelName, MatChannelNameFlipped = self.ReconstructFromFlattenedList(channels)
# get the channel name original
ListChannelNameOriginal = np.array([ChannelName.rsplit("_BP", 1)[0] for ChannelName in MatChannelName[0,:]])
# a list of the probability of the t-values
list_t_values = list_t_values_passed + list_t_values_failed
# a list of the probability of the causality
list_Causal = list_Causal_passed + list_Causal_fail
# make a list of WRS times t-value
list_WRS_t_values = list_Causal * list_t_values
# re-order the WRS * t-value
MatWRSTvalue, MatWRSTValueFlipped = self.ReconstructFromFlattenedList(list_WRS_t_values)
# make test results combining binomial and t-test
list_Test = np.array([])
for test_binomial, t_test in zip(list_Test_binomial, list_t_Test):
if test_binomial == 'pass' and t_test == 'pass':
list_Test = np.append(list_Test, "pass")
else:
list_Test = np.append(list_Test, "fail")
# re-order the t-test
MatTest, MatTestFlipped = self.ReconstructFromFlattenedList(list_Test)
# color the background either with blakc or gray for each channel
Listcolor = ['black', 'gray'] * 20 * 7
# get xticks for dominant channels
CenterTicks, ListInd, ListSubsys= plotter.CreateChannelTicks(ListChannelNameOriginal)
fig = plt.figure()
# make grid for subplots
gs = GridSpec(13, 30)
# a subplot showing channel position
ax1 = fig.add_subplot(gs[4:5, :-1]) # First row, up to the second last column
for color_ind, (edgeLeft, edgeRight) in enumerate(zip(ListInd[:-1], ListInd[1:]), 0):
if edgeRight == MatWRSTValueFlipped.shape[-1] - 1: # adjust the right edge of the last color span
edgeRight = edgeRight + 1
ax1.axvspan(edgeLeft - 0.5, edgeRight - 0.5, facecolor=Listcolor[color_ind], alpha=0.5, edgecolor=None)
# limit the xaxis of the first subplot
ax1.axis(xmin=-0.5, xmax=round(MatWRSTValueFlipped.shape[-1] * 1.001) - 1 + 0.5)
# get rid of ticks of the first plot
ax1.yaxis.set_major_locator(plt.NullLocator())
ax1.xaxis.set_major_locator(plt.NullLocator())
# assign a size for the second subplot
ax2 = fig.add_subplot(gs[5:, :-1])
#ax2.imshow(MatWRSTValueFlipped, aspect='auto', interpolation='nearest', cmap=plt.cm.viridis, origin='upper')
MatWRSTValueFlippedMasked = np.ma.masked_where(MatTestFlipped=='fail', MatWRSTValueFlipped)
ax2.imshow(MatWRSTValueFlippedMasked, aspect='auto', interpolation='none', cmap=plt.cm.Oranges, origin='upper')
#ax2.imshow(MatWRSTValueFlippedMasked, aspect='auto', interpolation='nearest', cmap=plt.cm.viridis, origin='upper')
ax2.set_xticks(CenterTicks)
ax2.axis(xmin=-0.5, xmax=round(MatWRSTValueFlipped.shape[-1] * 1.001) - 1 + 0.5)
ax2.set_xticklabels(ListSubsys[ListInd.astype(int)])
ax2.tick_params(axis="x", labelbottom=False, labeltop=True, direction='in', pad=2, length=0, labelrotation=90, size=0, labelsize=7)
# ytick are named after the frequency band, this is fliped around x-axis to match the mat_rho
ListYlabels = ['{}-{}'.format(lower, upper) for lower, upper in np.array(freq_bands)[::-1, ...]]
ListYlabels[0] = 'All' # change the name of None-None to All
ax2.set_yticks(range(len(freq_bands)))
ax2.set_yticklabels(ListYlabels)
ax2.tick_params(axis="y", labelsize=5, labelrotation=45)
ax2.grid(False)
ax2.set_ylabel('Frequency band [Hz]', fontsize=15)
ax2.set_xlabel('Auxiliary channel', fontsize=15)
###### Make a color bar
cmap = matplotlib.cm.Oranges
norm = matplotlib.colors.Normalize(vmin=MatWRSTValueFlipped.min(), vmax=MatWRSTValueFlipped.max())
ax3 = fig.add_subplot(gs[5:, -1])
cbar = matplotlib.colorbar.ColorbarBase(ax3, cmap=cmap, norm=norm, orientation='vertical', drawedges=False)
ax3.grid(False)
cbar.set_label('WRS * t-value', size=10)
# +++++++++++ plot a table of ranking channels +++++++++++++++++++
# a list of the probability of the causality
#list_Causal = list_Causal_passed + list_Causal_fail
# list of the error of the probability of the causality
# list_causal_err = list_causal_passed_err + list_causal_failed_err
# sort channel based on the probability of the causality
# make the lisk of frequency band, if no-frequency bnad limit is not specified (None-None), it is replaced with "All"
list_freq_bands = np.array(["{}-{}".format(lower, upper) if lower != 'None' else "All" for lower, upper in freq_bands])
# As a list of the modified channel name is structure in Channel A in band 0, Channel A in band 1, .., Channel B in band 0, ...
# A list of the corresponding frequency bands are created
# Note that np.repeat(list_freq_bands.reshape(list_freq_bands.shape[0],1), channels.shape[0], axis=1) is [[band 0, band 0, band 0, band 0, ..., band 0], [band 1, band 1, band 1, ...,, ], ..., [band 7, band 7], ...]
# Note flatten(order='F') flattened the array such that [band 0 , band 1, ..., band 7, band 0, band 1, ..., band 7], ...
list_freq_bands = np.repeat(list_freq_bands.reshape(list_freq_bands.shape[0], 1), channels.shape[0], axis=1).flatten(order='F')
# get rid of the BD# at the end of the channel name to make it original name
channels = np.array([ChannelName.rsplit("_BP", 1)[0] for ChannelName in channels])
# sort the links based on values of statistic
list_sorted_base_index_pass_fail, _, _ = plotter.ranking_channels(list_WRS_t_values, list_Test)
RankChannel = list(zip(channels[list_sorted_base_index_pass_fail], list_freq_bands[list_sorted_base_index_pass_fail], list_WRS_t_values[list_sorted_base_index_pass_fail], list_Test[list_sorted_base_index_pass_fail]))
# make pandas matrix
Prodf = pd.DataFrame(np.array(RankChannel), columns=('Channel', 'Band [Hz]', 'WRS*t-value', 'Test{}'.format(confidence_level)))
# define the table size
ColWidths =[0.05, 0.6, 0.1, 0.1, 0.15]
# add rank label
Prodf = Prodf.assign(rank=np.arange(1, list_WRS_t_values.shape[-1] + 1, 1))
# round importance
Prodf['WRS*t-value'] = Prodf['WRS*t-value'].astype(float).round(3) # = df.value1.round()
# re-sort columns to make rank labels coming first
cols = Prodf.columns.tolist()
cols = cols[-1:] + cols[:-1]
Prodf = Prodf[cols]
# take the first 5 high ranking channels
dcsummary = Prodf.iloc[:5]
# plot table on the top of the bar plot
table = ax1.table(cellText=dcsummary.values, colLabels=dcsummary.columns, cellLoc='center', loc='top', colWidths=ColWidths)
#table = ax4.table(cellText=dcsummary.values, colLabels=dcsummary.columns, cellLoc='center', loc='center', colWidths=ColWidths)
table.auto_set_font_size(False)
table.set_fontsize(6)
fig.subplots_adjust(hspace=0)
output_path = os.path.join(output_dir, output_file)
fig.savefig(output_path)
plt.close()
print('Completed Saving a plot of WRS*t-value in frequency band VS channel')
return None
[docs] def Plot_p_greater_MultiBand(self, channels, list_p_greater_passed, list_p_greater_failed, list_Test, confidence_level, output_dir, output_file, freq_bands=Const.freq_bands):
'''
description:
plot the result of p_greater
USAGE: Plot_p_greater_MultiBand(self, channels, list_p_greater_passed, list_p_greater_failed, list_Test, confidence_level, output_dir, output_file)
:param channels: a list of channels
:param list_p_greater_passed: a list of p_greater that pass the test
:param list_p_greater_failed: a list of p_greater that fail the test
:param list_Test: a list of the test results {'pass', 'fail'}
:param confidence_level: a confidence level
:param output_dir: an output directory
:param output_file: an output file name
:param freq_bands: frequency bands used for the multi-frequency band search, which is defined in const.py
:return: None
'''
plotter = PlotTableAnalysis()
# get the channel name matrix
MatChannelName, MatChannelNameFlipped = self.ReconstructFromFlattenedList(channels)
# get the channel name original
ListChannelNameOriginal = np.array([ChannelName.rsplit("_BP", 1)[0] for ChannelName in MatChannelName[0,:]])
# a list of the probability of the causality
list_p_greater = list_p_greater_passed + list_p_greater_failed
# re-order the causal probabilities
MatTvalue, MatTValueFlipped = self.ReconstructFromFlattenedList(list_p_greater)
# re-order the statistical test
MatTest, MatTestFlipped = self.ReconstructFromFlattenedList(list_Test)
# color the background either with blakc or gray for each channel
Listcolor = ['black', 'gray'] * 20 * 7
# get xticks for dominant channels
CenterTicks, ListInd, ListSubsys= plotter.CreateChannelTicks(ListChannelNameOriginal)
fig = plt.figure()
# make grid for subplots
gs = GridSpec(13, 30)
# a subplot showing channel position
ax1 = fig.add_subplot(gs[4:5, :-1]) # First row, up to the second last column
for color_ind, (edgeLeft, edgeRight) in enumerate(zip(ListInd[:-1], ListInd[1:]), 0):
if edgeRight == MatTValueFlipped.shape[-1] - 1: # adjust the right edge of the last color span
edgeRight = edgeRight + 1
ax1.axvspan(edgeLeft - 0.5, edgeRight - 0.5, facecolor=Listcolor[color_ind], alpha=0.5, edgecolor=None)
# limit the xaxis of the first subplot
ax1.axis(xmin=-0.5, xmax=round(MatTValueFlipped.shape[-1] * 1.001) -1 + 0.5)
# get rid of ticks of the first plot
ax1.yaxis.set_major_locator(plt.NullLocator())
ax1.xaxis.set_major_locator(plt.NullLocator())
# assign a size for the second subplot
ax2 = fig.add_subplot(gs[5:, :-1])
#ax2.imshow(MatTValueFlipped, aspect='auto', interpolation='nearest', cmap=plt.cm.viridis, origin='upper')
MatTValueFlippedMasked = np.ma.masked_where(MatTestFlipped =='fail', MatTValueFlipped)
ax2.imshow(MatTValueFlippedMasked, aspect='auto', interpolation='none', cmap=plt.cm.Oranges, origin='upper')
#ax2.imshow(MatTValueFlippedMasked, aspect='auto', interpolation='nearest', cmap=plt.cm.viridis, origin='upper')
ax2.set_xticks(CenterTicks)
ax2.axis(xmin=-0.5, xmax=round(MatTValueFlipped.shape[-1] * 1.001) - 1 + 0.5)
ax2.set_xticklabels(ListSubsys[ListInd.astype(int)])
ax2.tick_params(axis="x", labelbottom=False, labeltop=True, direction='in', pad=2, length=0, labelrotation=90, size=0, labelsize=7)
# ytick are named after the frequency band, this is flipped around x-axis to match the mat_rho
ListYlabels = ['{}-{}'.format(lower, upper) for lower, upper in np.array(freq_bands)[::-1, ...]]
ListYlabels[0] = 'All' # change the name of None-None to All
ax2.set_yticks(range(len(freq_bands)))
ax2.set_yticklabels(ListYlabels)
ax2.tick_params(axis="y", labelsize=5, labelrotation=45)
ax2.grid(False)
ax2.set_ylabel('Frequency band [Hz]', fontsize=15)
ax2.set_xlabel('Auxiliary channel', fontsize=15)
###### Make a color bar
cmap = matplotlib.cm.Oranges
norm = matplotlib.colors.Normalize(vmin=MatTValueFlipped.min(), vmax=MatTValueFlipped.max())
ax3 = fig.add_subplot(gs[5:, -1])
cbar = matplotlib.colorbar.ColorbarBase(ax3, cmap=cmap, norm=norm, orientation='vertical', drawedges=False)
ax3.grid(False)
cbar.set_label(r'$p_g$', size=10)
# +++++++++++ plot a table of ranking channels +++++++++++++++++++
# a list of the probability of the causality
#list_Causal = list_Causal_passed + list_Causal_fail
# list of the error of the probability of the causality
# list_causal_err = list_causal_passed_err + list_causal_failed_err
# sort channel based on the probability of the causality
# make the lisk of frequency band, if no-frequency bnad limit is not specified (None-None), it is replaced with "All"
list_freq_bands = np.array(["{}-{}".format(lower, upper) if lower != 'None' else "All" for lower, upper in freq_bands])
# As a list of the modified channel name is structure in Channel A in band 0, Channel A in band 1, .., Channel B in band 0, ...
# A list of the corresponding frequency bands are created
# Note that np.repeat(list_freq_bands.reshape(list_freq_bands.shape[0],1), channels.shape[0], axis=1) is [[band 0, band 0, band 0, band 0, ..., band 0], [band 1, band 1, band 1, ...,, ], ..., [band 7, band 7], ...]
# Note flatten(order='F') flattened the array such that [band 0 , band 1, ..., band 7, band 0, band 1, ..., band 7], ...
list_freq_bands = np.repeat(list_freq_bands.reshape(list_freq_bands.shape[0],1), channels.shape[0], axis=1).flatten(order='F')
# get rid of the BD# at the end of the channel name to make it original name
channels = np.array([ChannelName.rsplit("_BP", 1)[0] for ChannelName in channels])
# sort the links based on values of statistic
list_sorted_base_index_pass_fail, _, _ = plotter.ranking_channels(list_p_greater, list_Test)
RankChannel = list(zip(channels[list_sorted_base_index_pass_fail], list_freq_bands[list_sorted_base_index_pass_fail], list_p_greater[list_sorted_base_index_pass_fail], list_Test[list_sorted_base_index_pass_fail]))
# make pandas matrix
name_p_greater = r'$p_g$'
Prodf = pd.DataFrame(np.array(RankChannel), columns=('Channel', 'Band [Hz]', name_p_greater, 'Threshold {}'.format(confidence_level)))
# define the table size
ColWidths =[0.05, 0.6, 0.1, 0.1, 0.15]
# add rank label
Prodf = Prodf.assign(rank=np.arange(1, list_p_greater.shape[-1] + 1, 1))
# round importance
Prodf[name_p_greater] = Prodf[name_p_greater].astype(float).round(4) # = df.value1.round()
# re-sort columns to make rank labels coming first
cols = Prodf.columns.tolist()
cols = cols[-1:] + cols[:-1]
Prodf = Prodf[cols]
# take the first 5 high ranking channels
dcsummary = Prodf.iloc[:5]
# plot table on the top of the bar plot
table = ax1.table(cellText=dcsummary.values, colLabels=dcsummary.columns, cellLoc='center', loc='top', colWidths=ColWidths)
#table = ax4.table(cellText=dcsummary.values, colLabels=dcsummary.columns, cellLoc='center', loc='center', colWidths=ColWidths)
table.auto_set_font_size(False)
table.set_fontsize(6)
fig.subplots_adjust(hspace=0)
output_path = os.path.join(output_dir, output_file)
fig.savefig(output_path)
plt.close()
print('Completed Saving a plot of p_greater in frequency band VS channel')
return None