Source code for origli.utilities.multiband_search_utilities

#!/usr/bin/env python
'''
file name: multiband_search_utilities.py

This file contains the utilities to be used for multi-frequency band search
'''
##########################
# import module
###########################
import os
import sys
import h5py
import numpy as np
import itertools as itrtls
import time
import concurrent.futures
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from matplotlib.gridspec import GridSpec
import pandas as pd

try:
   from gwpy.timeseries import TimeSeries, TimeSeriesDict
except ImportError:
   print('There is not gwpy module yet')


sys.path.append(os.path.join(os.path.dirname(__file__), '..'))

import origli.utilities.const as Const # for multi frequency bands defined in
from origli.utilities.utilities import Multiprocess_whitening, PlotTableAnalysis
#import origli.utilities.veto_utilities # uncomment for multi-band search on WitnessFlag mode



[docs]def HierarchyChannelAboveThreshold_single_channel_multiband(whitened_fft_target, whitened_fft_PBG, whitened_fft_FBG, duration, sampling_rate, sigma): ''' description: calculate values of importance: a fraction of frequency bins in a frequency range above an upper bound of the off-source window for a single channel, in different frequency bnads USAGE: Counts_in_multibands = HierarchyChannelAboveThreshold_single_channel_multiband(whitened_fft_target, whitened_fft_PBG, whitened_fft_FBG, duration, sigma) :param whitened_fft_target: whitened fft of the on-source window :param whitened_fft_PBG: whitened fft of the preceding off-source window :param whitened_fft_FBG: whitened fft of the following off-source window :param duration: a duration of the on-source window :param sampling_rate: sampling rate of a channel :param sigma: an integer to determine the upper bound of the off-source window :return: Counts_in_multibands: values of importance in different frequency bands, numpy array ''' import origli.utilities.veto_utilities as veto_util # add it for multi-band search on WitnessFlag mode # iterate different frequency bands in the list Counts_in_multibands = np.array([veto_util.HierarchyChannelAboveThreshold_single_channel(whitened_fft_target, whitened_fft_PBG, whitened_fft_FBG, duration, sampling_rate, sigma, LowerCutOffFreq=lower_freq_cut, UpperCutOffFreq=upper_freq_cut) for lower_freq_cut, upper_freq_cut in Const.freq_bands]) return Counts_in_multibands
[docs]def CreateRho_multiband(full_timeseries, target_timeseries_start, target_timeseries_end, pre_background_start, pre_background_end, fol_background_start, fol_background_end, sigma): ''' discription: 1. calculate the whitened FFT of the on- and off-source window for a single channel 2. compute the value of importance for a single channel USAGE: Counts_in_multibands, sample_rate = CreateRho_multiband(full_timeseries, target_timeseries_start, target_timeseries_end, pre_background_start, pre_background_end, fol_background_start, fol_background_end, sigma) :param full_timeseries: the full time series in gwpy object including on- and off source windows :param target_timeseries_start: the start time of the on-source window :param target_timeseries_end: the end time of the on-source window :param pre_background_start: the start time of the preceding off-source window :param pre_background_end: the end time of the preceding off-source window :param fol_background_start: the start time of the following off-source window :param fol_background_end: the end time of the following off-source window :param sigma: an interger to calculate the value of importance :return: Counts_in_multibands: values of importance in different frequency bands where importance is a fraction of frequency bins in a frequency range above an upper bound of the off-source window for a single channel sample_rate: a sampling rate of a single channel ''' # calculate the whitened FFT of the on- and off-source windows whitened_fft_target, whitened_fft_PBG, whitened_fft_FBG, sample_rate, duration = Multiprocess_whitening(full_timeseries, target_timeseries_start, target_timeseries_end, pre_background_start, pre_background_end, fol_background_start, fol_background_end) # calculate the importance Counts_in_multibands = HierarchyChannelAboveThreshold_single_channel_multiband(whitened_fft_target, whitened_fft_PBG, whitened_fft_FBG, duration, sample_rate, sigma) return Counts_in_multibands, sample_rate
[docs]def CreateAllChannels_rho_multband(Listsegment, IFO, re_sfchs, number_process, PlusHOFT, sigma): ''' description: 1. use a single glitch time 2. query timeseries of all the channels around a glitch 3. condition (whitening and compare the on- and off-source window) 4. quantify all the channels (compute values of importance of all the channels) USAGE: IndexSatisfied, Mat_Count_in_multibands, list_sample_rates, re_sfchs, gpstime, duration, SNR, confi, ID = CreateAllChannels_rho_multband(Listsegment, IFO, re_sfchs, number_process, PlusHOFT, sigma) :param Listsegment: a list of segment parameters :param IFO :param channels: a list of safe channels :param number_process: a number of processes in parallel :param PlusHOFT: whether to get data of hoft, {'True' or 'False'} :param sigma: an integer to be used for calculating values of importance :return: IndexSatisfied: glitch index Mat_Count_in_multibands: a matrix of rho where frequencies in rows and channels in columns, numpy array list_sample_rates: a list of sampling rates of channels, numpy array re_sfchs: list of channels without "IFO:" at the beginning gpstime: a gps time duration: a value of duration SNR: signal to noise ratio in the h(t) conf: a confidence level of classification of a glitch, provided by Gravity Spy. Otherwise None ID: a glitch ID, provided by Gravity Spy in usual ''' channels = ['{0}:{1}'.format(IFO, ch) for ch in re_sfchs] IndexSatisfied, target_timeseries_start, target_timeseries_end, pre_background_start, pre_background_end, fol_background_start, fol_background_end, gpstime, duration, SNR, confi, ID = Listsegment t0 = time.time() # extracting the target data and the proceeding and following backgrounds try: # deal with time series in bad state if pre_background_start != 0 and pre_background_end != 0 and fol_background_start != 0 and fol_background_end != 0: # if there is an available proceeding background and if there is an available following background full_timeseriesDict = TimeSeriesDict.find(channels, pre_background_start, fol_background_end, frametype='{}_R'.format(IFO), verbose=False, nproc=number_process) elif pre_background_start != 0 and pre_background_end != 0 and fol_background_start == 0 and fol_background_end == 0: # if there is an available proceeding background and if there is NO available following background full_timeseriesDict = TimeSeriesDict.find(channels, pre_background_start, target_timeseries_end, frametype='{}_R'.format(IFO), verbose=False, nproc=number_process) elif pre_background_start == 0 and pre_background_end == 0 and fol_background_start != 0 and fol_background_end != 0: # if there is NO available proceeding background and if there is an available following background full_timeseriesDict = TimeSeriesDict.find(channels, target_timeseries_start, fol_background_end, frametype='{}_R'.format(IFO), verbose=False, nproc=number_process) else: pass if PlusHOFT == 'True': # get the data of hoft if pre_background_start != 0 and pre_background_end != 0 and fol_background_start != 0 and fol_background_end != 0: # if there is an available proceeding background and if there is an available following background full_timeseriesDict_hoft = TimeSeriesDict.find(['{}:GDS-CALIB_STRAIN'.format(IFO)], pre_background_start, fol_background_end, frametype='{}_HOFT_C00'.format(IFO), verbose=False, nproc=number_process) elif pre_background_start != 0 and pre_background_end != 0 and fol_background_start == 0 and fol_background_end == 0: # if there is an available proceeding background and if there is NO available following background full_timeseriesDict_hoft = TimeSeriesDict.find(['{}:GDS-CALIB_STRAIN'.format(IFO)], pre_background_start, target_timeseries_end, frametype='{}_HOFT_C00'.format(IFO), verbose=False, nproc=number_process) elif pre_background_start == 0 and pre_background_end == 0 and fol_background_start != 0 and fol_background_end != 0: # if there is NO available proceeding background and if there is an available following background full_timeseriesDict_hoft = TimeSeriesDict.find(['{}:GDS-CALIB_STRAIN'.format(IFO)], target_timeseries_start, fol_background_end, frametype='{}_HOFT_C00'.format(IFO), verbose=False, nproc=number_process) else: pass full_timeseriesDict.append(full_timeseriesDict_hoft) # append del full_timeseriesDict_hoft # delete the hoft object re_sfchs = np.append(re_sfchs, 'GDS-CALIB_STRAIN') except IndexError as err: print("IndexError: {}".format(err.message)) # IndexError: After reading object of type: FrAdcData stream is no longer in good state. ( Frmae: 0 ChannelType: L1:ASC-X_TR_A_NSUM_OUT_DQ offset: 932092099) return 0, 0, 0, 0, 0, 0, 0, 0, 0 except RuntimeError as err: print("RuntimeError: {}".format(err.message)) return 0, 0, 0, 0, 0, 0, 0, 0, 0 except IOError as err: # error when getting time series print('IOError: {}'.format(err.message)) return 0, 0, 0, 0, 0, 0, 0, 0, 0 except ValueError: # ValueError: Failed to read /hdfs/frames/O3/raw/H1/H-H1_R-12384/H-H1_R-1238489600-64.gwf: Unable to map from the stream class id (0) to internal class id at position: 18446744073709551601 [streamref: class: 0 instance: 0 length: 0 ] print('ValueError') return 0, 0, 0, 0, 0, 0, 0, 0, 0 try: Mat_Count_in_multibands = np.array([]) # a list of rho list_sample_rates = np.array([]) # whitened target and BG segments with multi processes with concurrent.futures.ProcessPoolExecutor(max_workers=number_process) as executor: for Counts_in_multibands, sample_rate in executor.map(CreateRho_multiband, full_timeseriesDict.values(), itrtls.repeat(target_timeseries_start), itrtls.repeat(target_timeseries_end), itrtls.repeat(pre_background_start), itrtls.repeat(pre_background_end), itrtls.repeat(fol_background_start), itrtls.repeat(fol_background_end), itrtls.repeat(sigma) ): # fill #Mat_Count_in_multibands = np.vstack([Mat_Count_in_multibands, Counts_in_multibands]) if Mat_Count_in_multibands.size else Counts_in_multibands Counts_in_multibands = Counts_in_multibands.reshape(Counts_in_multibands.shape[0], 1) Mat_Count_in_multibands = np.hstack([Mat_Count_in_multibands, Counts_in_multibands]) if Mat_Count_in_multibands.size else Counts_in_multibands list_sample_rates = np.append(list_sample_rates, sample_rate) except ValueError: print('*** delete group gps{:05d} due to ASD estimation error ***'.format(IndexSatisfied)) return 0, 0, 0, 0, 0, 0, 0, 0, 0 except IOError: # IOError: Driver write request failed print('*** delete group gps{:05d} due to memory error ***'.format(IndexSatisfied)) return 0, 0, 0, 0, 0, 0, 0, 0, 0 t1 = time.time() print('Time taken: {} s'.format(t1 - t0)) return IndexSatisfied, Mat_Count_in_multibands, list_sample_rates, re_sfchs, gpstime, duration, SNR, confi, ID # pass the parameters
[docs]def SaveTargetAndBackGroundHDF5_multiband_OFFLINE(Listsegments, re_sfchs, IFO, outputpath, outputfilename, number_process, sigma, PlusHOFT='False'): ''' description: THIS IS USED FOR "OFFLINE" MODE 0. assuming Listsegments is given by Findglitchlist() 1.take the information of the list of allowed target and a preceding and following segments 2. whiten a target segmement based on the average background segment 3. find whitened FFT 4. save the whitened target and backgrounds FFTs Note this depends on USAGE: SaveTargetAndBackGroundHDF5(Listsegments, re_sfchs, IFO, outputpath, outputfilename, mode=='offline') :param Listsegments: a list of segment parameters :param channels: a list of safe channels :param outputpath: a directory of an output file :param outputfilename: a name of an output file :param number_process: a number of processes in parallel :param sigma: an integer to determine the upper bound of the off-source window :param PlusHOFT: whether to get data of hoft, {'True' or 'False'}, 'False' in default :return None ''' # the path and the file name outputPathFileName = os.path.join(outputpath, outputfilename) # make an output file try: f = h5py.File(outputPathFileName, 'w-') f.close() except IOError: print('The file already exists in {}'.format(outputPathFileName)) print('Please choose a different file name in the configuration file or delete this file if you want to overwrite it') sys.exit() # open a HDF5 file in writing mode with h5py.File(outputPathFileName, 'r+') as f: for Listsegment in Listsegments: IndexSatisfied, Mat_Count_in_multibands, list_sample_rates, re_sfchs_used, gpstime, duration, SNR, confi, ID = CreateAllChannels_rho_multband(Listsegment, IFO, re_sfchs, number_process, PlusHOFT, sigma) if type(duration) == int: # if some error occurs, List_rho is 0, go to the next iteration continue else: g = f.create_group('gps{:05d}'.format(IndexSatisfied)) g.attrs['GPS'] = gpstime # a peak GPS time g.attrs['duration'] = duration g.attrs['snr'] = SNR # snr g.attrs['confidence'] = confi # confidence g.attrs['id'] = ID g.attrs['ifo'] = IFO dset_mat_rho = g.create_dataset("mat_rho", data=Mat_Count_in_multibands, compression="gzip", compression_opts=9) # a matrix of rho dset_sample_rates = g.create_dataset("sample_rates", data=list_sample_rates, compression="gzip", compression_opts=9) # list of sample rates of channels dset_channels = g.create_dataset("channels", data=[re_sfch.encode('utf8') for re_sfch in re_sfchs_used], compression="gzip", compression_opts=9) # list of channels freq_bands = np.array(Const.freq_bands).tolist() for i in range(len(freq_bands)): for j in range(len(freq_bands[0])): freq_bands[i][j] = freq_bands[i][j].encode('utf8') dset_freq_bands = g.create_dataset("freq_bands", data=freq_bands, compression="gzip", compression_opts=9) # matrix of frequency bnads
#dsetTarget.attrs['duration'] = List_DURATION[ch_label] # target
[docs]def SaveTargetAndBackGroundHDF5_multiband_ONLINE(Listsegments, re_sfchs, IFO, outputpath, outputfilename, number_process, sigma, PlusHOFT): ''' description: THIS IS USED FOR "ONLINE" MODE 0. assuming Listsegments is given by Findglitchlist() 1.take the information of the list of allowed target and a preceding and following segments 2. whiten a target segmement based on the average background segment 3. find whitened FFT 4. save the whitened target and backgrounds FFTs Note this depends on Multiprocess_whitening() USAGE: SaveTargetAndBackGroundHDF5(Listsegments, re_sfchs, IFO, outputpath, outputfilename, number_process, sigma, PlusHOFT) :param Listsegments: a list of segment parameters :param channels: a list of safe channels :param outputpath: a directory of an output file :param outputfilename: a name of an output file :param number_process: a number of processes in parallel :param sigma: an integer to determine the upper bound of the off-source window :param PlusHOFT: whether to get data of hoft, {'True' or 'False'} :return None ''' # the path and the file name outputPathFileName = os.path.join(outputpath, outputfilename) # make an output file and open it try: f = h5py.File(outputPathFileName, 'w-') f.close() except IOError: print('The file already exists in {}'.format(outputPathFileName)) print('Please choose a different file name in a configuration file or delete this file if you want to overwrite it') sys.exit() with h5py.File(outputPathFileName, 'r+') as f: # ================= for Listsegment in Listsegments: IndexSatisfied, Mat_Count_in_multibands, list_sample_rates, re_sfchs_used, gpstime, duration, SNR, confi, ID = CreateAllChannels_rho_multband(Listsegment, IFO, re_sfchs, number_process, PlusHOFT, sigma) if type(duration) == int: # if some error occurs, List_rho is 0, go to the next iteration continue else: g = f.create_group('gps{:05d}'.format(IndexSatisfied)) g.attrs['GPS'] = gpstime # a peak GPS time g.attrs['duration'] = duration g.attrs['snr'] = SNR # snr g.attrs['confidence'] = confi # confidence g.attrs['id'] = ID g.attrs['ifo'] = IFO dset_mat_rho = g.create_dataset("mat_rho", data=Mat_Count_in_multibands, compression="gzip", compression_opts=9) # a matrix of rho dset_sample_rates = g.create_dataset("sample_rates", data=list_sample_rates, compression="gzip", compression_opts=9) # list of sample rates of channels dset_channels = g.create_dataset("channels", data=[re_sfch.encode('utf8') for re_sfch in re_sfchs_used], compression="gzip", compression_opts=9) # list of channels freq_bands = np.array(Const.freq_bands).tolist() for i in range(len(freq_bands)): for j in range(len(freq_bands[0])): freq_bands[i][j] = freq_bands[i][j].encode('utf8') dset_freq_bands = g.create_dataset("freq_bands", data=freq_bands,compression="gzip", compression_opts=9) # matrix of frequency bnads #dsetTarget.attrs['duration'] = List_DURATION[ch_label] # target yield g # pass a group object as a generator
#+++++++++++++++++++++++++++++++++++++++++++++++ # plot tools #+++++++++++++++++++++++++++++++++++++++++++++++
[docs]class PlotTableAnalysis_multiband(): def __init__(self): self.f = None # a HDF5 file storing a glitch class
[docs] def getHDF5Object(self): ''' show a HDF5 file object USAGE: getHDF5Object() :return: a dictionary ''' if self.f == None: print('A HDF5 file object is not set yet') else: return self.f
[docs] def setHDF5Object(self, input_dir, input_file): ''' set a HDF5 file object :param f_dict: a dictionary of time series data sets :return: None ''' input_path = os.path.join(input_dir, input_file) f_input = h5py.File(input_path, 'r') # read it in the read mode self.f = f_input
[docs] def CreateMatCount_multiband(self, g_Individual = None): ''' description: 1. find counts for each glitch 2. stack over all the gliches 3. make a matix comprising importance versus channels dependencies: self.HierarchyChannelAboveThreshold(g, LowerCutOffFreq, UpperCutOffFreq) USAGE: MatCount, ListChannelName, ListSNR, ListConf, ListGPS, ListDuration, ListID, mat_rho, freq_bands, ListOriginalChannelName = CreateMatCount_multiband() # for all glitches MatCount, ListChannelName, SNR, Conf, GPS, Duration, ID, mat_rho, freq_bands, ListOriginalChannelName = CreateMatCount_multiband(g_Individual) # for an individual glitch :param g_Individual: a HDF5 file group for a glitch :return: MatCount: a matrix comprising importance versus channels ListChannelName: a list of channel names, combining frequency band information ListSNR: a list of SNRs ListConf: a list of confidence ListGPS: a list of GPS times ListID: a list of IDs mat_rho: a matrix of rho where frequencies in rows channels in columns, numpy array freq_bands: a matrix of frequency bands ListOriginalChannelName: a list of channel names, original ''' if g_Individual == None: # for all the glitches in a glitch class MatCount = np.array([]) ListSNR = np.array([]) ListConf = np.array([]) ListGPS = np.array([]) ListID = np.array([]) ListDuration = np.array([]) num_dataset = 0 # a number of datasets in a group, set to be 0 as a first instance for g_label in list(self.f.keys()): # iterate through all the groups try: g = self.f[g_label] except KeyError: # KeyError: 'Unable to open object (Bad object header version number)' continue # the following if statement is for the solution where no dataset in a group if len(g.keys()) < num_dataset: # if a number of dataset in this group is less than that of the previous group continue # go to the next iteration num_dataset = len(g.keys()) # update a number of datasets with that of the current group GPS = g.attrs['GPS'] # a GPS time duration = g.attrs['duration'] # a value of duration in sec SNR = g.attrs['snr'] # snr confidence = g.attrs['confidence'] # confidence ID = g.attrs['id'] # glich id ListOriginalChannelName = [channel.decode('utf8') for channel in g['channels'][()]] # a list of channels freq_bands = g['freq_bands'][()] # a matrix of frequency bands, e.g., np.array([['1', '128'], ['128', '256'], ...] #--- covert data type of each element to string from udf8 freq_bands = freq_bands.tolist() for i in range(len(freq_bands)): for j in range(len(freq_bands[0])): freq_bands[i][j] = freq_bands[i][j].decode('utf8') freq_bands = np.array(freq_bands) mat_rho = g['mat_rho'][()] # a matrix of rhos, in the structure of (# of channels, # freq bands) ListChannelName = np.array([np.core.defchararray.add(ListOriginalChannelName, '_BP{}'.format(int(i))) for i in range(freq_bands.shape[0])]) # a matrix of channels with freq_band, frequency bands in rows in channels in columns ListChannelName = ListChannelName.flatten(order='F') # list of revised channel names where ChannlA_BP0, ChannlA_BP1, ..., ChannlB_BP0 ListCount = mat_rho.flatten(order='F') # list of rhos, in the structure of (rho of channel A in band 0, rho of channel A in band 1, ..., rho of channel B in band 0) # stack the vector of counts MatCount = np.vstack([MatCount, ListCount]) if MatCount.size else ListCount # append SNR ListSNR = np.append(ListSNR, SNR) # append confidence level ListConf = np.append(ListConf, confidence) # append GPS time ListGPS = np.append(ListGPS, GPS) # append ID ListID = np.append(ListID, ID) # append duration ListDuration = np.append(ListDuration, duration) return MatCount, ListChannelName, ListSNR, ListConf, ListGPS, ListDuration, ListID, mat_rho, freq_bands, ListOriginalChannelName else: # individual glitch MatCount = np.array([]) GPS = g_Individual.attrs['GPS'] # a GPS time Duration = g_Individual.attrs['duration'] # a value of duration in sec SNR = g_Individual.attrs['snr'] # snr Conf = g_Individual.attrs['confidence'] # confidence ID = g_Individual.attrs['id'] # glich id ListOriginalChannelName = [channel.decode('utf8') for channel in g_Individual['channels'][()]] # a list of channels freq_bands = g_Individual['freq_bands'][()] # a matrix of frequency bands, e.g., np.array([['1', '128'], ['128', '256'], ...] # --- covert data type of each element to string from udf8 freq_bands = freq_bands.tolist() for i in range(len(freq_bands)): for j in range(len(freq_bands[0])): freq_bands[i][j] = freq_bands[i][j].decode('utf8') freq_bands = np.array(freq_bands) mat_rho = g_Individual['mat_rho'][()] # a matrix of rhos, in the structure of (# of channels, # freq bands) # make a matrix of channels with band information added to the end of the channel name, where frequency bands are in rows from top to bottom and channels in columes ListChannelName = np.array([np.core.defchararray. add(ListOriginalChannelName, '_BP{}'.format(int(i))) for i in range(freq_bands.shape[0])]) ListChannelName = ListChannelName.flatten(order='F') # list of revised channel names where ChannlA_BP0, ChannlA_BP1, ..., ChannlB_BP0 ListCount = mat_rho.flatten(order='F') # list of rhos, in the structure of (rho of channel A in band 0, rho of channel A in band 1, ..., rho of channel B in band 0) # stack the vector of counts MatCount = np.vstack([MatCount, ListCount]) if MatCount.size else ListCount return MatCount, ListChannelName, SNR, Conf, GPS, Duration, ID, mat_rho, freq_bands, ListOriginalChannelName
[docs] def PlotFrequencyVSChannel(self, glitchtype, SNR, Conf, GPS, Duration, ID, URL, mat_rho, ListOriginalChannelName, freq_bands, output_dir, output_file): ''' description: make a plot of frequencies versus channels for a glitch dependencies: CreateChannelTicks() USAGE: PlotFrequencyVSChannel(glitchtype, SNR, Conf, GPS, Duration, ID, URL, mat_rho, ListOriginalChannelName, freq_bands, output_dir, output_file) :param glitchtype: a type of a glitch :param SNR: SNR in h(t) :param Conf: classification confidence level :param GPS: a gps time :param Duration: a duration of a gltich :param ID: Gravity Spy ID :param URL: Q-transform in h(t) of a glitch store in Gravity Spy :param mat_rho: a matrix of rho where frequencies in rows channels in columns, numpy array :param ListOriginalChannelName: a list of original channel names :param freq_bands: a matrix of frequency bands :param output_dir: :param output_file: :return: None ''' # color the background either with blakc or gray for each channel Listcolor = ['black', 'gray'] * 20 * 7 # get xticks for dominant channels plotter = PlotTableAnalysis() CenterTicks, ListInd, ListSubsys = plotter.CreateChannelTicks(ListOriginalChannelName) fig = plt.figure() # make grid for subplots gs = GridSpec(9, 30) # a subplot showing channel position ax1 = fig.add_subplot(gs[:1, :-1]) # First row, up to the second last column for color_ind, (edgeLeft, edgeRight) in enumerate(zip(ListInd[:-1], ListInd[1:]), 0): if edgeRight == mat_rho.shape[-1] - 1: # adjust the right edge of the last color span edgeRight = edgeRight + 1 ax1.axvspan(edgeLeft-0.5, edgeRight-0.5, facecolor=Listcolor[color_ind], alpha=0.5, edgecolor=None) # limit the xaxis of the first subplot ax1.axis(xmin=-0.5, xmax=round(mat_rho.shape[-1] * 1.001) -1 + 0.5) # get rid of ticks of the first plot ax1.yaxis.set_major_locator(plt.NullLocator()) ax1.xaxis.set_major_locator(plt.NullLocator()) ax1.set_title('{}, SNR = {:.02f}, confidence = {:.02f}, \n GPS = {:.02f}, ID = {}, duration = {} s\n Q-transform of h(t): {}'.format(glitchtype, SNR, Conf, GPS, ID, Duration, URL), fontsize=10) # assign a size for the second subplot ax2 = fig.add_subplot(gs[1:, :-1]) #MatCount = np.log10(MatCount+1) mat_rho = mat_rho[::-1, ...] # flip the matrix around x axis such that it has order of band 0, band 1, band 2, ... from the bottom CMAP = 'OrRd' ax2.imshow(mat_rho, aspect='auto', interpolation='nearest', cmap=getattr(plt.cm, CMAP), origin='upper') ax2.axis(xmin=-0.5, xmax=round(mat_rho.shape[-1] * 1.001) - 1 + 0.5) ax2.set_xticks(CenterTicks) ax2.set_xticklabels(ListSubsys[ListInd.astype(int)]) ax2.tick_params(axis="x", labelbottom=False, labeltop=True, direction='in', pad=5, length=0, labelrotation=90, size=0, labelsize=10) # ============= # ytick are named after the frequency band, this is fliped around x-axis to match the mat_rho ListYlabels = ['{}-{}'.format(lower, upper) for lower, upper in freq_bands[::-1, ...]] ListYlabels[0] = 'All' # change the name of None-None to All ax2.set_yticks(range(freq_bands.shape[0])) ax2.set_yticklabels(ListYlabels) ax2.tick_params(axis="y", labelsize=5, labelrotation=45) ax2.grid(False) ax2.set_ylabel('Frequency band [Hz]', fontsize=15) ax2.set_xlabel('Auxiliary channel', fontsize=15) ###### Make a color bar #cmap = matplotlib.cm.viridis norm = matplotlib.colors.Normalize(vmin=mat_rho.min(), vmax=mat_rho.max()) ax3 = fig.add_subplot(gs[1:, -1]) cbar = matplotlib.colorbar.ColorbarBase(ax3, cmap=getattr(plt.cm, CMAP), norm=norm, orientation='vertical', drawedges=False) ax3.grid(False) cbar.set_label(r'$q$', size=15) ######## fig.subplots_adjust(hspace=0) output_path = os.path.join(output_dir, output_file) fig.savefig(output_path) plt.close() print('Completed Saving a plot of frequency band VS channel') # #-----------------------------------------------------------------+ # # make .csv file comprising GPS, importance of all the channels | # #-----------------------------------------------------------------+ # # concatenate a list of GPS times and importance of all the channels # MatGPSDurationImpotanceChannels = np.concatenate((ListGPS.reshape(ListGPS.shape[0], 1), ListDuration.reshape(ListDuration.shape[0], 1), MatCount), axis=1) # # make a list of column names # columnName = np.append(np.array(['GPS']), np.array(['duration'])) # columnName = np.append(columnName, ListChannelName) # # convert it to a pandas frame # dfGPSDurationImportanceChannels = pd.DataFrame(MatGPSDurationImpotanceChannels, columns=columnName) # path_output_fileGPSImportanceChannels = os.path.join(output_dir, 'GPSDurationImportanceChannels.csv') # # save it as .csv file # dfGPSDurationImportanceChannels.to_csv(path_output_fileGPSImportanceChannels) # print('Completed saving a table comprising GPS times, durations, and importance of the channels') return None
[docs] def PlotIndividualFCS_ImportanceVSChannel_multiband(self, glitchtype, IFO, GravitySpy_df, output_dir, mode='offline', sigma=None, Listsegments=None, re_sfchs=None, Data_outputpath=None, Data_outputfilename=None, PlusHOFT='False', number_process=None): ''' description: 1. load a file comprising all glitches in a class 2. create a plot comprising frequency versus channel & importance versus channel - dependency: self.CreateChannelTicks(ListChannel) self.make_subset_channel_based_on_samplingrate() self.CreateMatCount() self.PlotImportanceVSChannel() 3. save a plot dependencies: make_subset_channel_based_on_samplingrate(), CreateChannelTicks(), CreateMatCount(), PlotImportanceVSChannel() USAGE: PlotIndividualFCS_ImportanceVSChannel_multiband(self, glitchtype, IFO, GravitySpy_df, output_dir, mode='offline', Listsegments=None, re_sfchs=None, Data_outputpath=None, Data_outputfilename=None, PlusHOFT='False', number_process=None) :param glitchtype: a type of glitch used for create a name of a plot :param IFO: # a type of IFO used in a name of a plot :param GravitySpy_df: a meta data of Gravity Spy in pandas frame :param output_dir: a output directory :param simga: an integer number used for the upper bound of BG noise :param mode: 'offline' or 'online' :param sigma: an integer to determine the upper bound of the off-source window :param Listsegments: a list of allowed glitches, which is used for online mode only, None in default :param re_sfchs: a list of safe channels except unused channels, which is used for online mode only, None in default :param Data_outputpath: a directory saving for a HDF5 file, which is used for online mode only, None in default :param Data_outputfilename: a file saving for a HDF5 file, which is used for online mode only, None in default :param PlusHOFT: whether to get data of HOFT {'True', 'False'}, which is used for online mode only, 'False' in default :param number_process: a number of processes in parallel, which is used for online mode only, None in default return None ''' if mode == 'offline': for group_label in list(self.f.keys()): # iterate though all glitches in a class g = self.f[group_label] if len(g.keys()) == 0: # if no dataset in this group, go to a next iteration continue # ========================= MatCount, ListChannelName, SNR, Conf, GPS, Duration, ID, mat_rho, freq_bands, ListOriginalChannelName = self.CreateMatCount_multiband(g) # get a URL of Q-transform of h(t) try: URL = GravitySpy_df[GravitySpy_df['id'] == ID]['imgUrl'].tolist()[0] except IndexError: # deal with the one with no Q-transport plot of h(t) URL = 'None' output_file = '{}.pdf'.format(str(GPS).replace('.', '-')) self.PlotFrequencyVSChannel(glitchtype, SNR, Conf, GPS, Duration, ID, URL, mat_rho, ListOriginalChannelName, freq_bands, output_dir, output_file) print('Completed plotting for a(n) {} glitch in {} at {}'.format(glitchtype, IFO, str(GPS))) elif mode == 'online': for g in SaveTargetAndBackGroundHDF5_multiband_ONLINE(Listsegments, re_sfchs, IFO, Data_outputpath, Data_outputfilename, number_process, sigma, PlusHOFT): # iterate group by group # ========================= MatCount, ListChannelName, SNR, Conf, GPS, Duration, ID, mat_rho, freq_bands, ListOriginalChannelName = self.CreateMatCount_multiband(g) # get a URL of Q-transform of h(t) try: URL = GravitySpy_df[GravitySpy_df['id'] == ID]['imgUrl'].tolist()[0] except IndexError: # deal with the one with no Q-transport plot of h(t) URL = 'None' output_file = '{}.pdf'.format(str(GPS).replace('.', '-')) self.PlotFrequencyVSChannel(glitchtype, SNR, Conf, GPS, Duration, ID, URL, mat_rho, ListOriginalChannelName, freq_bands, output_dir, output_file) print('Completed plotting for a(n) {} glitch in {} at {}'.format(glitchtype, IFO, str(GPS))) return None
[docs] def ReconstructFromFlattenedList(self, flattened_list, freq_bands=Const.freq_bands): ''' description: make the matrix of the original order from the flatten USAGE: mat_originl, mat_flipped = ReconstructFromFlattenedList(flattened_list, freq_bands=Const.freq_bands) :param flattened_list: flattend list from the matrix using np.flatten(order='F') :param freq_bands: frequency bands :return: mat_originl: a matrix where frequency bnads in row from the top to bottom, and channels in columns mat_flipped: a matrix where # frequency bnads in row from the bottom to top, and channels in columns ''' num_freq_bands = len(freq_bands) # number of frquency bands num_channels = int(flattened_list.shape[0] / num_freq_bands) # number of channels mat_originl = flattened_list.reshape(num_freq_bands, num_channels, order='F') # frequency bnads in row from the top to bottom, and channels in columns mat_flipped = mat_originl[::-1, ...] # frequency bnads in row from the bottom to top, and channels in columns return mat_originl, mat_flipped
[docs] def PlotCausalityVSChannelMultiBandNoMaskNoTable(self, list_Causal_passed, list_Causal_fail, list_causal_passed_err, list_causal_failed_err, list_Test, ListChannelName, output_dir, output_file, BinomialTestConfidence, freq_bands=Const.freq_bands): ''' description: plot witness ratio statistics of chanenls in multi-frequency bands without mask and table USAGE: PlotCausalityVSChannelMultiBandNoMaskNoTable(list_Causal_passed, list_Causal_fail, list_Test, ListChannelName, output_dir, output_file, BinomialTestConfidence) :param list_Causal_passed: a list of the probability of the causality that are passed one-tailed Binomial test, if not zero :param list_Causal_fail: a list of the probability of the causality that are failed one-tailed Binomial test, if not zero :param list_causal_passed_err: a list of the error of the causal probability that are passed one-tailed Binomial test, otherwise, zero :param list_causal_failed_err: a list of the error of the causal probability that are failed one-tailed Binomial test, otherwise, zero :param list_Test: a list of results of the Binomial test, 'pass' or 'fail' :param ListChannelName: a list of channel names :param output_dir: (only used for all glitches) :param output_file: (only used for all glitches) :param BinomialTestConfidence: binomial test confidence level :param freq_bands: frequency bands used for the multi-frequency band search, which is defined in const.py :return: None ''' plotter = PlotTableAnalysis() # get the channel name matrix MatChannelName, MatChannelNameFlipped = self.ReconstructFromFlattenedList(ListChannelName) # get the channel name original ListChannelNameOriginal = np.array([ChannelName.rsplit("_BP", 1)[0] for ChannelName in MatChannelName[0,:]]) # a list of the probability of the causality list_Causal = list_Causal_passed + list_Causal_fail # re-order the causal probabilities MatCausal, MatCausalFlipped = self.ReconstructFromFlattenedList(list_Causal) # re-order the statistical test MatTest, MatTestFlipped = self.ReconstructFromFlattenedList(list_Test) # color the background either with blakc or gray for each channel Listcolor = ['black', 'gray'] * 20 * 7 # get xticks for dominant channels CenterTicks, ListInd, ListSubsys= plotter.CreateChannelTicks(ListChannelNameOriginal) fig = plt.figure() # make grid for subplots gs = GridSpec(9, 30) # a subplot showing channel position ax1 = fig.add_subplot(gs[:1, :-1]) # First row, up to the second last column for color_ind, (edgeLeft, edgeRight) in enumerate(zip(ListInd[:-1], ListInd[1:]), 0): if edgeRight == MatCausalFlipped.shape[-1] - 1: # adjust the right edge of the last color span edgeRight = edgeRight + 1 ax1.axvspan(edgeLeft, edgeRight, facecolor=Listcolor[color_ind], alpha=0.5, edgecolor=None) # limit the xaxis of the first subplot ax1.axis(xmin=-0.5, xmax=round(MatCausalFlipped.shape[-1] * 0.001) -1 + 0.5) # get rid of ticks of the first plot ax1.yaxis.set_major_locator(plt.NullLocator()) ax1.xaxis.set_major_locator(plt.NullLocator()) # assign a size for the second subplot ax2 = fig.add_subplot(gs[1:, :-1]) #MatCount = np.log10(MatCount+1) #ax2.imshow(MatCausalFlipped, aspect='auto', interpolation='nearest', cmap=plt.cm.viridis, origin='upper') MatCausalFlippedMasked = np.ma.masked_where(MatTestFlipped =='fail', MatCausalFlipped) ax2.imshow(MatCausalFlippedMasked, aspect='auto', interpolation='nearest', cmap=plt.cm.viridis, origin='upper') ax2.set_xticks(CenterTicks) ax2.axis(xmin=-0.5, xmax=round(MatCausalFlipped.shape[-1] * 0.001) - 1 + 0.5) ax2.set_xticklabels(ListSubsys[ListInd.astype(int)]) ax2.tick_params(axis="x", labelbottom=False, labeltop=True, direction='in', pad=5, length=0, labelrotation=90, size=0, labelsize=10) # ytick are named after the frequency band, this is fliped around x-axis to match the mat_rho ListYlabels = ['{}-{}'.format(lower, upper) for lower, upper in freq_bands[::-1, ...]] ListYlabels[0] = 'All' # change the name of None-None to All ax2.set_yticks(range(len(freq_bands))) ax2.set_yticklabels(ListYlabels) ax2.tick_params(axis="y", labelsize=5, labelrotation=45) ax2.grid(False) ax2.set_ylabel('Frequency band [Hz]', fontsize=15) ax2.set_xlabel('Auxiliary channel', fontsize=15) ###### Make a color bar cmap = matplotlib.cm.viridis norm = matplotlib.colors.Normalize(vmin=MatCausalFlipped.min(), vmax=MatCausalFlipped.max()) ax3 = fig.add_subplot(gs[1:, -1]) cbar = matplotlib.colorbar.ColorbarBase(ax3, cmap=cmap, norm=norm, orientation='vertical', drawedges=False) ax3.grid(False) cbar.set_label('Witness ratio statistic', size=10) ######## fig.subplots_adjust(hspace=0) output_path = os.path.join(output_dir, output_file) fig.savefig(output_path) plt.close() print('Completed Saving a plot of WRS in frequency band VS channel') return None
[docs] def PlotCausalityVSChannelMultiBand(self, list_Causal_passed, list_Causal_fail, list_causal_passed_err, list_causal_failed_err, list_Test, ListChannelName, output_dir, output_file, BinomialTestConfidence, freq_bands=Const.freq_bands): ''' description: plot witness ratio statistics of chanenls in multi-frequency bands the cells which do not pass the test are masked USAGE: PlotCausalityVSChannelMultiBand(list_Causal_passed, list_Causal_fail, list_Test, ListChannelName, output_dir, output_file, BinomialTestConfidence) :param list_Causal_passed: a list of the probability of the causality that are passed one-tailed Binomial test, if not zero :param list_Causal_fail: a list of the probability of the causality that are failed one-tailed Binomial test, if not zero :param list_causal_passed_err: a list of the error of the causal probability that are passed one-tailed Binomial test, otherwise, zero :param list_causal_failed_err: a list of the error of the causal probability that are failed one-tailed Binomial test, otherwise, zero :param list_Test: a list of results of the Binomial test, 'pass' or 'fail' :param ListChannelName: a list of channel names :param output_dir: (only used for all glitches) :param output_file: (only used for all glitches) :param BinomialTestConfidence: binomial test confidence level :param freq_bands: frequency bands used for the multi-frequency band search, which is defined in const.py :return: None ''' plotter = PlotTableAnalysis() # get the channel name matrix MatChannelName, MatChannelNameFlipped = self.ReconstructFromFlattenedList(ListChannelName) # get the channel name original ListChannelNameOriginal = np.array([ChannelName.rsplit("_BP", 1)[0] for ChannelName in MatChannelName[0,:]]) # a list of the probability of the causality list_Causal = list_Causal_passed + list_Causal_fail # re-order the causal probabilities MatCausal, MatCausalFlipped = self.ReconstructFromFlattenedList(list_Causal) # re-order the statistical test MatTest, MatTestFlipped = self.ReconstructFromFlattenedList(list_Test) # color the background either with blakc or gray for each channel Listcolor = ['black', 'gray'] * 20 * 7 # get xticks for dominant channels CenterTicks, ListInd, ListSubsys = plotter.CreateChannelTicks(ListChannelNameOriginal) fig = plt.figure() # make grid for subplots gs = GridSpec(13, 30) # a subplot showing channel position ax1 = fig.add_subplot(gs[4:5, :-1]) # First row, up to the second last column for color_ind, (edgeLeft, edgeRight) in enumerate(zip(ListInd[:-1], ListInd[1:]), 0): if edgeRight == MatCausalFlipped.shape[-1] - 1: # adjust the right edge of the last color span edgeRight = edgeRight + 1 ax1.axvspan(edgeLeft - 0.5, edgeRight - 0.5, facecolor=Listcolor[color_ind], alpha=0.5, edgecolor=None) # ax1.set_edgecolor(None) # limit the xaxis of the first subplot ax1.axis(xmin=-0.5, xmax=round(MatCausalFlipped.shape[-1]*1.001)-1+0.5) # get rid of ticks of the first plot ax1.yaxis.set_major_locator(plt.NullLocator()) ax1.xaxis.set_major_locator(plt.NullLocator()) # assign a size for the second subplot ax2 = fig.add_subplot(gs[5:, :-1]) #ax2.imshow(MatCausalFlipped, aspect='auto', interpolation='nearest', cmap=plt.cm.viridis, origin='upper') MatCausalFlippedMasked = np.ma.masked_where(MatTestFlipped =='fail', MatCausalFlipped) ax2.imshow(MatCausalFlippedMasked, aspect='auto', interpolation='none', cmap=plt.cm.Oranges, origin='upper') #ax2.imshow(MatCausalFlippedMasked, aspect='auto', interpolation='nearest', cmap=plt.cm.viridis, origin='upper') ax2.set_xticks(CenterTicks) ax2.axis(xmin=-0.5, xmax=round(MatCausalFlipped.shape[-1] * 1.001) - 1+0.5) ax2.set_xticklabels(ListSubsys[ListInd.astype(int)]) ax2.tick_params(axis="x", labelbottom=False, labeltop=True, direction='in', pad=2, length=0, labelrotation=90, size=0, labelsize=7) # ytick are named after the frequency band, this is fliped around x-axis to match the mat_rho ListYlabels = ['{}-{}'.format(lower, upper) for lower, upper in np.array(freq_bands)[::-1, ...]] ListYlabels[0] = 'All' # change the name of None-None to All ax2.set_yticks(range(len(freq_bands))) ax2.set_yticklabels(ListYlabels) ax2.tick_params(axis="y", labelsize=5, labelrotation=45) ax2.grid(False) ax2.set_ylabel('Frequency band [Hz]', fontsize=15) ax2.set_xlabel('Auxiliary channel', fontsize=15) ###### Make a color bar cmap = matplotlib.cm.Oranges norm = matplotlib.colors.Normalize(vmin=MatCausalFlipped.min(), vmax=MatCausalFlipped.max()) ax3 = fig.add_subplot(gs[5:, -1]) cbar = matplotlib.colorbar.ColorbarBase(ax3, cmap=cmap, norm=norm, orientation='vertical', drawedges=False) ax3.grid(False) cbar.set_label('Witness ratio statistic', size=10) # +++++++++++ plot a table of ranking channels +++++++++++++++++++ # a list of the probability of the causality #list_Causal = list_Causal_passed + list_Causal_fail # list of the error of the probability of the causality # list_causal_err = list_causal_passed_err + list_causal_failed_err # sort channel based on the probability of the causality # make the lisk of frequency band, if no-frequency bnad limit is not specified (None-None), it is replaced with "All" list_freq_bands = np.array(["{}-{}".format(lower, upper) if lower != 'None' else "All" for lower, upper in freq_bands]) # As a list of the modified channel name is structure in Channel A in band 0, Channel A in band 1, .., Channel B in band 0, ... # A list of the corresponding frequency bands are created # Note that np.repeat(list_freq_bands.reshape(list_freq_bands.shape[0],1), channels.shape[0], axis=1) is [[band 0, band 0, band 0, band 0, ..., band 0], [band 1, band 1, band 1, ...,, ], ..., [band 7, band 7], ...] # Note flatten(order='F') flattened the array such that [band 0 , band 1, ..., band 7, band 0, band 1, ..., band 7], ... list_freq_bands = np.repeat(list_freq_bands.reshape(list_freq_bands.shape[0],1), ListChannelName.shape[0], axis=1).flatten(order='F') # get rid of the BD# at the end of the channel name to make it original name ListChannelName = np.array([ChannelName.rsplit("_BP", 1)[0] for ChannelName in ListChannelName]) # sort the links based on values of statistic list_sorted_base_index_pass_fail, _, _ = plotter.ranking_channels(list_Causal, list_Test) RankChannel = list(zip(ListChannelName[list_sorted_base_index_pass_fail], list_freq_bands[list_sorted_base_index_pass_fail], list_Causal[list_sorted_base_index_pass_fail], list_Test[list_sorted_base_index_pass_fail])) # make pandas matrix Prodf = pd.DataFrame(np.array(RankChannel), columns=('Channel', 'Band [Hz]', 'WRS', 'Test{}'.format(BinomialTestConfidence))) # define the table size ColWidths =[0.05, 0.6, 0.1, 0.1, 0.15] # add rank label Prodf = Prodf.assign(rank=np.arange(1, list_Causal_passed.shape[-1] + 1, 1)) # round importance Prodf['WRS'] = Prodf['WRS'].astype(float).round(3) # = df.value1.round() # re-sort columns to make rank labels coming first cols = Prodf.columns.tolist() cols = cols[-1:] + cols[:-1] Prodf = Prodf[cols] # take the first 5 high ranking channels dcsummary = Prodf.iloc[:5] # plot table on the top of the bar plot table = ax1.table(cellText=dcsummary.values, colLabels=dcsummary.columns, cellLoc='center', loc='top', colWidths=ColWidths) #table = ax4.table(cellText=dcsummary.values, colLabels=dcsummary.columns, cellLoc='center', loc='center', colWidths=ColWidths) table.auto_set_font_size(False) table.set_fontsize(6) fig.subplots_adjust(hspace=0) output_path = os.path.join(output_dir, output_file) fig.savefig(output_path) plt.close() print('Completed Saving a plot of WRS in frequency band VS channel') return None
[docs] def Plot_Welch_t_test_MultiBand(self, channels, list_t_values_passed, list_t_values_failed, list_Test, confidence_level, output_dir, output_file, freq_bands=Const.freq_bands): ''' description: plot the result of one-sided Welch t-test USAGE: Plot_Welch_t_test(channels, list_t_values_passed, list_t_values_failed, list_Test, output_dir, output_file) :param channels: a list of channels :param list_t_values_passed: a list of t-values that pass the test :param list_t_values_failed: a list of t-values that fail the test :param list_Test: a list of the test results {'pass', 'fail'} :param confidence_level: a confidence level :param output_dir: an output directory :param output_file: an output file name :param freq_bands: frequency bands used for the multi-frequency band search, which is defined in const.py :return: None ''' plotter = PlotTableAnalysis() # get the channel name matrix MatChannelName, MatChannelNameFlipped = self.ReconstructFromFlattenedList(channels) # get the channel name original ListChannelNameOriginal = np.array([ChannelName.rsplit("_BP", 1)[0] for ChannelName in MatChannelName[0,:]]) # a list of the probability of the causality list_t_values = list_t_values_passed + list_t_values_failed # re-order the causal probabilities MatTvalue, MatTValueFlipped = self.ReconstructFromFlattenedList(list_t_values) # re-order the statistical test MatTest, MatTestFlipped = self.ReconstructFromFlattenedList(list_Test) # color the background either with blakc or gray for each channel Listcolor = ['black', 'gray'] * 20 * 7 # get xticks for dominant channels CenterTicks, ListInd, ListSubsys= plotter.CreateChannelTicks(ListChannelNameOriginal) fig = plt.figure() # make grid for subplots gs = GridSpec(13, 30) # a subplot showing channel position ax1 = fig.add_subplot(gs[4:5, :-1]) # First row, up to the second last column for color_ind, (edgeLeft, edgeRight) in enumerate(zip(ListInd[:-1], ListInd[1:]), 0): if edgeRight == MatTValueFlipped.shape[-1] - 1: # adjust the right edge of the last color span edgeRight = edgeRight + 1 ax1.axvspan(edgeLeft - 0.5, edgeRight - 0.5, facecolor=Listcolor[color_ind], alpha=0.5, edgecolor=None) # limit the xaxis of the first subplot ax1.axis(xmin=-0.5, xmax=round(MatTValueFlipped.shape[-1] * 1.001) -1 + 0.5) # get rid of ticks of the first plot ax1.yaxis.set_major_locator(plt.NullLocator()) ax1.xaxis.set_major_locator(plt.NullLocator()) # assign a size for the second subplot ax2 = fig.add_subplot(gs[5:, :-1]) #ax2.imshow(MatTValueFlipped, aspect='auto', interpolation='nearest', cmap=plt.cm.viridis, origin='upper') MatTValueFlippedMasked = np.ma.masked_where(MatTestFlipped =='fail', MatTValueFlipped) ax2.imshow(MatTValueFlippedMasked, aspect='auto', interpolation='none', cmap=plt.cm.Oranges, origin='upper') #ax2.imshow(MatTValueFlippedMasked, aspect='auto', interpolation='nearest', cmap=plt.cm.viridis, origin='upper') ax2.set_xticks(CenterTicks) ax2.axis(xmin=-0.5, xmax=round(MatTValueFlipped.shape[-1] * 1.001) - 1 + 0.5) ax2.set_xticklabels(ListSubsys[ListInd.astype(int)]) ax2.tick_params(axis="x", labelbottom=False, labeltop=True, direction='in', pad=2, length=0, labelrotation=90, size=0, labelsize=7) # ytick are named after the frequency band, this is flipped around x-axis to match the mat_rho ListYlabels = ['{}-{}'.format(lower, upper) for lower, upper in np.array(freq_bands)[::-1, ...]] ListYlabels[0] = 'All' # change the name of None-None to All ax2.set_yticks(range(len(freq_bands))) ax2.set_yticklabels(ListYlabels) ax2.tick_params(axis="y", labelsize=5, labelrotation=45) ax2.grid(False) ax2.set_ylabel('Frequency band [Hz]', fontsize=15) ax2.set_xlabel('Auxiliary channel', fontsize=15) ###### Make a color bar cmap = matplotlib.cm.Oranges norm = matplotlib.colors.Normalize(vmin=MatTValueFlipped.min(), vmax=MatTValueFlipped.max()) ax3 = fig.add_subplot(gs[5:, -1]) cbar = matplotlib.colorbar.ColorbarBase(ax3, cmap=cmap, norm=norm, orientation='vertical', drawedges=False) ax3.grid(False) cbar.set_label('t-value', size=10) # +++++++++++ plot a table of ranking channels +++++++++++++++++++ # a list of the probability of the causality #list_Causal = list_Causal_passed + list_Causal_fail # list of the error of the probability of the causality # list_causal_err = list_causal_passed_err + list_causal_failed_err # sort channel based on the probability of the causality # make the lisk of frequency band, if no-frequency bnad limit is not specified (None-None), it is replaced with "All" list_freq_bands = np.array(["{}-{}".format(lower, upper) if lower != 'None' else "All" for lower, upper in freq_bands]) # As a list of the modified channel name is structure in Channel A in band 0, Channel A in band 1, .., Channel B in band 0, ... # A list of the corresponding frequency bands are created # Note that np.repeat(list_freq_bands.reshape(list_freq_bands.shape[0],1), channels.shape[0], axis=1) is [[band 0, band 0, band 0, band 0, ..., band 0], [band 1, band 1, band 1, ...,, ], ..., [band 7, band 7], ...] # Note flatten(order='F') flattened the array such that [band 0 , band 1, ..., band 7, band 0, band 1, ..., band 7], ... list_freq_bands = np.repeat(list_freq_bands.reshape(list_freq_bands.shape[0],1), channels.shape[0], axis=1).flatten(order='F') # get rid of the BD# at the end of the channel name to make it original name channels = np.array([ChannelName.rsplit("_BP", 1)[0] for ChannelName in channels]) # sort the links based on values of statistic list_sorted_base_index_pass_fail, _, _ = plotter.ranking_channels(list_t_values, list_Test) RankChannel = list(zip(channels[list_sorted_base_index_pass_fail], list_freq_bands[list_sorted_base_index_pass_fail], list_t_values[list_sorted_base_index_pass_fail], list_Test[list_sorted_base_index_pass_fail])) # make pandas matrix Prodf = pd.DataFrame(np.array(RankChannel), columns=('Channel', 'Band [Hz]', 't-value', 'Test{}'.format(confidence_level))) # define the table size ColWidths =[0.05, 0.6, 0.1, 0.1, 0.15] # add rank label Prodf = Prodf.assign(rank=np.arange(1, list_t_values.shape[-1] + 1, 1)) # round importance Prodf['t-value'] = Prodf['t-value'].astype(float).round(3) # = df.value1.round() # re-sort columns to make rank labels coming first cols = Prodf.columns.tolist() cols = cols[-1:] + cols[:-1] Prodf = Prodf[cols] # take the first 5 high ranking channels dcsummary = Prodf.iloc[:5] # plot table on the top of the bar plot table = ax1.table(cellText=dcsummary.values, colLabels=dcsummary.columns, cellLoc='center', loc='top', colWidths=ColWidths) #table = ax4.table(cellText=dcsummary.values, colLabels=dcsummary.columns, cellLoc='center', loc='center', colWidths=ColWidths) table.auto_set_font_size(False) table.set_fontsize(6) fig.subplots_adjust(hspace=0) output_path = os.path.join(output_dir, output_file) fig.savefig(output_path) plt.close() print('Completed Saving a plot of t-value in frequency band VS channel') return None
[docs] def Plot_WRS_Welch_t_test_MultiBand(self, channels, list_Causal_passed, list_Causal_fail, list_Test_binomial, list_t_values_passed, list_t_values_failed, list_t_Test, confidence_level, output_dir, output_file, freq_bands=Const.freq_bands): ''' description: plot the result of one-sided Welch t-test USAGE: Plot_Welch_t_test(channels, list_t_values_passed, list_t_values_failed, list_Test, output_dir, output_file) :param channels: a list of channels :param list_t_values_passed: a list of t-values that pass the test :param list_t_values_failed: a list of t-values that fail the test :param list_Test: a list of the test results {'pass', 'fail'} :param confidence_level: a confidence level :param output_dir: an output directory :param output_file: an output file name :param freq_bands: frequency bands used for the multi-frequency band search, which is defined in const.py :return: None ''' plotter = PlotTableAnalysis() # get the channel name matrix MatChannelName, MatChannelNameFlipped = self.ReconstructFromFlattenedList(channels) # get the channel name original ListChannelNameOriginal = np.array([ChannelName.rsplit("_BP", 1)[0] for ChannelName in MatChannelName[0,:]]) # a list of the probability of the t-values list_t_values = list_t_values_passed + list_t_values_failed # a list of the probability of the causality list_Causal = list_Causal_passed + list_Causal_fail # make a list of WRS times t-value list_WRS_t_values = list_Causal * list_t_values # re-order the WRS * t-value MatWRSTvalue, MatWRSTValueFlipped = self.ReconstructFromFlattenedList(list_WRS_t_values) # make test results combining binomial and t-test list_Test = np.array([]) for test_binomial, t_test in zip(list_Test_binomial, list_t_Test): if test_binomial == 'pass' and t_test == 'pass': list_Test = np.append(list_Test, "pass") else: list_Test = np.append(list_Test, "fail") # re-order the t-test MatTest, MatTestFlipped = self.ReconstructFromFlattenedList(list_Test) # color the background either with blakc or gray for each channel Listcolor = ['black', 'gray'] * 20 * 7 # get xticks for dominant channels CenterTicks, ListInd, ListSubsys= plotter.CreateChannelTicks(ListChannelNameOriginal) fig = plt.figure() # make grid for subplots gs = GridSpec(13, 30) # a subplot showing channel position ax1 = fig.add_subplot(gs[4:5, :-1]) # First row, up to the second last column for color_ind, (edgeLeft, edgeRight) in enumerate(zip(ListInd[:-1], ListInd[1:]), 0): if edgeRight == MatWRSTValueFlipped.shape[-1] - 1: # adjust the right edge of the last color span edgeRight = edgeRight + 1 ax1.axvspan(edgeLeft - 0.5, edgeRight - 0.5, facecolor=Listcolor[color_ind], alpha=0.5, edgecolor=None) # limit the xaxis of the first subplot ax1.axis(xmin=-0.5, xmax=round(MatWRSTValueFlipped.shape[-1] * 1.001) - 1 + 0.5) # get rid of ticks of the first plot ax1.yaxis.set_major_locator(plt.NullLocator()) ax1.xaxis.set_major_locator(plt.NullLocator()) # assign a size for the second subplot ax2 = fig.add_subplot(gs[5:, :-1]) #ax2.imshow(MatWRSTValueFlipped, aspect='auto', interpolation='nearest', cmap=plt.cm.viridis, origin='upper') MatWRSTValueFlippedMasked = np.ma.masked_where(MatTestFlipped=='fail', MatWRSTValueFlipped) ax2.imshow(MatWRSTValueFlippedMasked, aspect='auto', interpolation='none', cmap=plt.cm.Oranges, origin='upper') #ax2.imshow(MatWRSTValueFlippedMasked, aspect='auto', interpolation='nearest', cmap=plt.cm.viridis, origin='upper') ax2.set_xticks(CenterTicks) ax2.axis(xmin=-0.5, xmax=round(MatWRSTValueFlipped.shape[-1] * 1.001) - 1 + 0.5) ax2.set_xticklabels(ListSubsys[ListInd.astype(int)]) ax2.tick_params(axis="x", labelbottom=False, labeltop=True, direction='in', pad=2, length=0, labelrotation=90, size=0, labelsize=7) # ytick are named after the frequency band, this is fliped around x-axis to match the mat_rho ListYlabels = ['{}-{}'.format(lower, upper) for lower, upper in np.array(freq_bands)[::-1, ...]] ListYlabels[0] = 'All' # change the name of None-None to All ax2.set_yticks(range(len(freq_bands))) ax2.set_yticklabels(ListYlabels) ax2.tick_params(axis="y", labelsize=5, labelrotation=45) ax2.grid(False) ax2.set_ylabel('Frequency band [Hz]', fontsize=15) ax2.set_xlabel('Auxiliary channel', fontsize=15) ###### Make a color bar cmap = matplotlib.cm.Oranges norm = matplotlib.colors.Normalize(vmin=MatWRSTValueFlipped.min(), vmax=MatWRSTValueFlipped.max()) ax3 = fig.add_subplot(gs[5:, -1]) cbar = matplotlib.colorbar.ColorbarBase(ax3, cmap=cmap, norm=norm, orientation='vertical', drawedges=False) ax3.grid(False) cbar.set_label('WRS * t-value', size=10) # +++++++++++ plot a table of ranking channels +++++++++++++++++++ # a list of the probability of the causality #list_Causal = list_Causal_passed + list_Causal_fail # list of the error of the probability of the causality # list_causal_err = list_causal_passed_err + list_causal_failed_err # sort channel based on the probability of the causality # make the lisk of frequency band, if no-frequency bnad limit is not specified (None-None), it is replaced with "All" list_freq_bands = np.array(["{}-{}".format(lower, upper) if lower != 'None' else "All" for lower, upper in freq_bands]) # As a list of the modified channel name is structure in Channel A in band 0, Channel A in band 1, .., Channel B in band 0, ... # A list of the corresponding frequency bands are created # Note that np.repeat(list_freq_bands.reshape(list_freq_bands.shape[0],1), channels.shape[0], axis=1) is [[band 0, band 0, band 0, band 0, ..., band 0], [band 1, band 1, band 1, ...,, ], ..., [band 7, band 7], ...] # Note flatten(order='F') flattened the array such that [band 0 , band 1, ..., band 7, band 0, band 1, ..., band 7], ... list_freq_bands = np.repeat(list_freq_bands.reshape(list_freq_bands.shape[0], 1), channels.shape[0], axis=1).flatten(order='F') # get rid of the BD# at the end of the channel name to make it original name channels = np.array([ChannelName.rsplit("_BP", 1)[0] for ChannelName in channels]) # sort the links based on values of statistic list_sorted_base_index_pass_fail, _, _ = plotter.ranking_channels(list_WRS_t_values, list_Test) RankChannel = list(zip(channels[list_sorted_base_index_pass_fail], list_freq_bands[list_sorted_base_index_pass_fail], list_WRS_t_values[list_sorted_base_index_pass_fail], list_Test[list_sorted_base_index_pass_fail])) # make pandas matrix Prodf = pd.DataFrame(np.array(RankChannel), columns=('Channel', 'Band [Hz]', 'WRS*t-value', 'Test{}'.format(confidence_level))) # define the table size ColWidths =[0.05, 0.6, 0.1, 0.1, 0.15] # add rank label Prodf = Prodf.assign(rank=np.arange(1, list_WRS_t_values.shape[-1] + 1, 1)) # round importance Prodf['WRS*t-value'] = Prodf['WRS*t-value'].astype(float).round(3) # = df.value1.round() # re-sort columns to make rank labels coming first cols = Prodf.columns.tolist() cols = cols[-1:] + cols[:-1] Prodf = Prodf[cols] # take the first 5 high ranking channels dcsummary = Prodf.iloc[:5] # plot table on the top of the bar plot table = ax1.table(cellText=dcsummary.values, colLabels=dcsummary.columns, cellLoc='center', loc='top', colWidths=ColWidths) #table = ax4.table(cellText=dcsummary.values, colLabels=dcsummary.columns, cellLoc='center', loc='center', colWidths=ColWidths) table.auto_set_font_size(False) table.set_fontsize(6) fig.subplots_adjust(hspace=0) output_path = os.path.join(output_dir, output_file) fig.savefig(output_path) plt.close() print('Completed Saving a plot of WRS*t-value in frequency band VS channel') return None
[docs] def Plot_p_greater_MultiBand(self, channels, list_p_greater_passed, list_p_greater_failed, list_Test, confidence_level, output_dir, output_file, freq_bands=Const.freq_bands): ''' description: plot the result of p_greater USAGE: Plot_p_greater_MultiBand(self, channels, list_p_greater_passed, list_p_greater_failed, list_Test, confidence_level, output_dir, output_file) :param channels: a list of channels :param list_p_greater_passed: a list of p_greater that pass the test :param list_p_greater_failed: a list of p_greater that fail the test :param list_Test: a list of the test results {'pass', 'fail'} :param confidence_level: a confidence level :param output_dir: an output directory :param output_file: an output file name :param freq_bands: frequency bands used for the multi-frequency band search, which is defined in const.py :return: None ''' plotter = PlotTableAnalysis() # get the channel name matrix MatChannelName, MatChannelNameFlipped = self.ReconstructFromFlattenedList(channels) # get the channel name original ListChannelNameOriginal = np.array([ChannelName.rsplit("_BP", 1)[0] for ChannelName in MatChannelName[0,:]]) # a list of the probability of the causality list_p_greater = list_p_greater_passed + list_p_greater_failed # re-order the causal probabilities MatTvalue, MatTValueFlipped = self.ReconstructFromFlattenedList(list_p_greater) # re-order the statistical test MatTest, MatTestFlipped = self.ReconstructFromFlattenedList(list_Test) # color the background either with blakc or gray for each channel Listcolor = ['black', 'gray'] * 20 * 7 # get xticks for dominant channels CenterTicks, ListInd, ListSubsys= plotter.CreateChannelTicks(ListChannelNameOriginal) fig = plt.figure() # make grid for subplots gs = GridSpec(13, 30) # a subplot showing channel position ax1 = fig.add_subplot(gs[4:5, :-1]) # First row, up to the second last column for color_ind, (edgeLeft, edgeRight) in enumerate(zip(ListInd[:-1], ListInd[1:]), 0): if edgeRight == MatTValueFlipped.shape[-1] - 1: # adjust the right edge of the last color span edgeRight = edgeRight + 1 ax1.axvspan(edgeLeft - 0.5, edgeRight - 0.5, facecolor=Listcolor[color_ind], alpha=0.5, edgecolor=None) # limit the xaxis of the first subplot ax1.axis(xmin=-0.5, xmax=round(MatTValueFlipped.shape[-1] * 1.001) -1 + 0.5) # get rid of ticks of the first plot ax1.yaxis.set_major_locator(plt.NullLocator()) ax1.xaxis.set_major_locator(plt.NullLocator()) # assign a size for the second subplot ax2 = fig.add_subplot(gs[5:, :-1]) #ax2.imshow(MatTValueFlipped, aspect='auto', interpolation='nearest', cmap=plt.cm.viridis, origin='upper') MatTValueFlippedMasked = np.ma.masked_where(MatTestFlipped =='fail', MatTValueFlipped) ax2.imshow(MatTValueFlippedMasked, aspect='auto', interpolation='none', cmap=plt.cm.Oranges, origin='upper') #ax2.imshow(MatTValueFlippedMasked, aspect='auto', interpolation='nearest', cmap=plt.cm.viridis, origin='upper') ax2.set_xticks(CenterTicks) ax2.axis(xmin=-0.5, xmax=round(MatTValueFlipped.shape[-1] * 1.001) - 1 + 0.5) ax2.set_xticklabels(ListSubsys[ListInd.astype(int)]) ax2.tick_params(axis="x", labelbottom=False, labeltop=True, direction='in', pad=2, length=0, labelrotation=90, size=0, labelsize=7) # ytick are named after the frequency band, this is flipped around x-axis to match the mat_rho ListYlabels = ['{}-{}'.format(lower, upper) for lower, upper in np.array(freq_bands)[::-1, ...]] ListYlabels[0] = 'All' # change the name of None-None to All ax2.set_yticks(range(len(freq_bands))) ax2.set_yticklabels(ListYlabels) ax2.tick_params(axis="y", labelsize=5, labelrotation=45) ax2.grid(False) ax2.set_ylabel('Frequency band [Hz]', fontsize=15) ax2.set_xlabel('Auxiliary channel', fontsize=15) ###### Make a color bar cmap = matplotlib.cm.Oranges norm = matplotlib.colors.Normalize(vmin=MatTValueFlipped.min(), vmax=MatTValueFlipped.max()) ax3 = fig.add_subplot(gs[5:, -1]) cbar = matplotlib.colorbar.ColorbarBase(ax3, cmap=cmap, norm=norm, orientation='vertical', drawedges=False) ax3.grid(False) cbar.set_label(r'$p_g$', size=10) # +++++++++++ plot a table of ranking channels +++++++++++++++++++ # a list of the probability of the causality #list_Causal = list_Causal_passed + list_Causal_fail # list of the error of the probability of the causality # list_causal_err = list_causal_passed_err + list_causal_failed_err # sort channel based on the probability of the causality # make the lisk of frequency band, if no-frequency bnad limit is not specified (None-None), it is replaced with "All" list_freq_bands = np.array(["{}-{}".format(lower, upper) if lower != 'None' else "All" for lower, upper in freq_bands]) # As a list of the modified channel name is structure in Channel A in band 0, Channel A in band 1, .., Channel B in band 0, ... # A list of the corresponding frequency bands are created # Note that np.repeat(list_freq_bands.reshape(list_freq_bands.shape[0],1), channels.shape[0], axis=1) is [[band 0, band 0, band 0, band 0, ..., band 0], [band 1, band 1, band 1, ...,, ], ..., [band 7, band 7], ...] # Note flatten(order='F') flattened the array such that [band 0 , band 1, ..., band 7, band 0, band 1, ..., band 7], ... list_freq_bands = np.repeat(list_freq_bands.reshape(list_freq_bands.shape[0],1), channels.shape[0], axis=1).flatten(order='F') # get rid of the BD# at the end of the channel name to make it original name channels = np.array([ChannelName.rsplit("_BP", 1)[0] for ChannelName in channels]) # sort the links based on values of statistic list_sorted_base_index_pass_fail, _, _ = plotter.ranking_channels(list_p_greater, list_Test) RankChannel = list(zip(channels[list_sorted_base_index_pass_fail], list_freq_bands[list_sorted_base_index_pass_fail], list_p_greater[list_sorted_base_index_pass_fail], list_Test[list_sorted_base_index_pass_fail])) # make pandas matrix name_p_greater = r'$p_g$' Prodf = pd.DataFrame(np.array(RankChannel), columns=('Channel', 'Band [Hz]', name_p_greater, 'Threshold {}'.format(confidence_level))) # define the table size ColWidths =[0.05, 0.6, 0.1, 0.1, 0.15] # add rank label Prodf = Prodf.assign(rank=np.arange(1, list_p_greater.shape[-1] + 1, 1)) # round importance Prodf[name_p_greater] = Prodf[name_p_greater].astype(float).round(4) # = df.value1.round() # re-sort columns to make rank labels coming first cols = Prodf.columns.tolist() cols = cols[-1:] + cols[:-1] Prodf = Prodf[cols] # take the first 5 high ranking channels dcsummary = Prodf.iloc[:5] # plot table on the top of the bar plot table = ax1.table(cellText=dcsummary.values, colLabels=dcsummary.columns, cellLoc='center', loc='top', colWidths=ColWidths) #table = ax4.table(cellText=dcsummary.values, colLabels=dcsummary.columns, cellLoc='center', loc='center', colWidths=ColWidths) table.auto_set_font_size(False) table.set_fontsize(6) fig.subplots_adjust(hspace=0) output_path = os.path.join(output_dir, output_file) fig.savefig(output_path) plt.close() print('Completed Saving a plot of p_greater in frequency band VS channel') return None