Source code for tadlib.hitad.chromLev

# -*- coding: utf-8 -*-
"""
Created on Tue May 31 15:47:34 2016

@author: wxt
"""

from __future__ import division
import copy, collections
import numpy as np
from scipy import sparse
import matplotlib
matplotlib.use('agg')
from tadlib.hitad.aligner import BoundSet, DomainSet, DomainAligner, hierFormat, Container
from tadlib.calfea import analyze
from matplotlib.colors import Normalize

class MidpointNormalize(Normalize):
    def __init__(self, vmin=None, vmax=None, midpoint=None, clip=False):
        self.midpoint = midpoint
        Normalize.__init__(self, vmin, vmax, clip)

    def __call__(self, value, clip=None):
        if self.vmin == self.vmax:
            return np.ma.masked_array(np.interp(value, [self.vmin], [0.5]))
        
        if self.vmin < self.midpoint < self.vmax:
            x, y = [self.vmin, self.midpoint, self.vmax], [0, 0.5, 1]
        elif self.vmin >= self.midpoint:
            x, y = [self.vmin, self.vmax], [0.5, 1]
        elif self.vmax <= self.midpoint:
            x, y = [self.vmin, self.vmax], [0, 0.5]
            
        return np.ma.masked_array(np.interp(value, x, y))

np.seterr(divide = "ignore")

[docs]class Chrom(object): """ *Chrom* is defined to: - Hold Hi-C data within a certain chromosome - Identify hierarchical domains in 4 steps: 1.Calculate adaptive DIs. 2.Identify original candidate bounds by 5-state Gaussian mixture Hidden Markov Model using adaptive DIs as input. 3.Select TAD bounds from candidate bounds. 4.Recursively identify inner domain structures of each TAD. - Visualize any region of the chromosome. Hierarchical domains will be plotted as boxes along with the diagonal of the heatmap, and adaptive DI track will be placed on top of the heatmap. Parameters ---------- chrom : str Chromosome label. res : int Resolution of the Hi-C data in base-pair unit. hicdata : CSR sparse matrix Bin-level Hi-C matrix of the specified chromosome. replabel : str Biological replicate label. maxapart : int Maximum allowable TAD size in base-pair unit. (Default: 4000000) Attributes ---------- chrom : str Chromosome label. res : int Resolution in base-pair unit. maxapart : int Maximum allowable TAD size. replabel : str Biological replicate label. chromLen : int Total bin number of the chromosome. rawMatrix : sparse matrix in Compressed Sparse Row format CSR sparse matrix is used to extract Hi-C data by slicing conveniently while guarantee low memory overhead. """ defaultwindow = 2000000 minsize = 5 def __init__(self, chrom, res, hicdata, replabel, maxapart=4000000): self.chrom = chrom self.res = res self.maxapart = maxapart self.replabel = replabel self._rm = 1 self._dw = self.defaultwindow // res self.chromLen = hicdata.shape[0] self.hmm = None x, y = hicdata.nonzero() mat_ = hicdata[x, y] if isinstance(mat_, np.matrix): IF = np.array(mat_).ravel() else: # mat_ is a sparse matrix IF = np.array(mat_.todense()).ravel() IF[np.isnan(IF)] = 0 self.rawMatrix = self._genSparseMatrix(x, y, IF) del x, y, IF, hicdata self._state = 'Submitted' def _genSparseMatrix(self, x, y, IF): extendLen = self.chromLen + 2*self.chromLen rawMatrix = sparse.csr_matrix((IF, (x + self.chromLen, y + self.chromLen)), shape = (extendLen, extendLen)) return rawMatrix
[docs] def detectPeaks(self, trends, mph=0, mpd=5): """ Detect peaks (local maxima) in a 1-D array intuitively (a peak must be greater than its immediate neighbors). Parameters ---------- trends : 1-D numpy ndarray Data. mph : float Only peaks that are greater than this value will be detected. (Default: 0) mpd : positive integer Only peaks whose indices are at least separated by this value will be reported. (Default: 5) Returns ------- ind : 1-D numpy ndarray Indices of peaks detected in *trends*. """ dx = trends[1:] - trends[:-1] ind = np.where((np.hstack((dx, 0)) < 0) & (np.hstack((0, dx)) > 0))[0] sp = np.where(trends==1)[0] if sp.size: ind = np.r_[sp[-1], ind] if dx[-1] > 0: ind = np.r_[ind, trends.size-1] # Filter peaks by mph if ind.size: ind = ind[trends[ind] > mph] # Remove small peaks closer than mpd if ind.size and mpd > 1: ind = ind[np.argsort(trends[ind])][::-1] idel = np.zeros(ind.size, dtype = bool) for i in range(ind.size): if not idel[i]: idel = idel | (ind >= ind[i] - mpd) & (ind <= ind[i] + mpd) \ & (trends[ind[i]] > trends[ind]) idel[i] = 0 ind = np.sort(ind[~idel]) return ind
[docs] def randomCheck(self, seq, pthre = 0.05): """ We use chi square test to test the randomness of a sequence by looking at the conversion frequency between neighbors in the sequence. Parameters ---------- seq : str A string containing only '1' or '0'. (e.g. '101000101') pthre : float, 0-1 Significance level of the hypothesis tests. Returns ------- reject : bool True if we should reject the null hypothesis (the sequence is generated randomly) under the selected significance level. """ from scipy.stats import chisquare pairwise = zip(seq[1:], seq[:-1]) d = collections.defaultdict(int) for k in pairwise: d[k] += 1 for k in [('0','0'), ('0','1'), ('1','0'), ('1','1')]: if not k in d: d[k] = 0 obs = np.array(list(d.values())) exp = np.ones_like(obs) * obs.mean() _, pval = chisquare(obs, exp) reject = pval<=pthre return reject
[docs] def oriWindow(self, P): """ Estimate the most appropriate window size for current bin to best capture the local interaction bias direction. See Also -------- tadlib.hitad.chromLev.Chrom.detectPeaks : detect peaks given a 1-D array tadlib.hitad.chromLev.Chrom.randomCheck : randomness test for a two-valued (0-1) sequence """ noise = P == 0 check = noise[:20] noiselevel = check.sum() / check.size if noiselevel > 0.6: return 0 indices = np.arange(1, P.size+1) m = [P < 0, # Downstream bias P > 0] # Upstream bias trends_1 = m[0].cumsum().astype(float) / np.arange(1, m[0].size + 1) trends_2 = m[1].cumsum().astype(float) / np.arange(1, m[1].size + 1) inds = [self.detectPeaks(trends_1, 0.5, 5), self.detectPeaks(trends_2, 0.5, 5)] pool = {} for i in [0, 1]: for p in inds[i]: pool[p] = i for p in sorted(pool): seq = ''.join([str(int(i)) for i in m[pool[p]][:(p+1)]]) tmp = indices[p]+self._rm+1 if tmp >= self.minsize: # hasn't been systematically tested if self.randomCheck(seq): return tmp return self._dw
[docs] def minWindows(self, start, end, maxw): """ Estimate best window size for each bin of a given range. Parameters ---------- start, end : int Specify range of the bin indices. maxw : int Maximum allowable window size. Attributes ---------- windows : 1-D numpy.ndarray, int32 See Also -------- tadlib.hitad.chromLev.Chrom.oriWindow : Window size estimation for a single bin. """ start += self.chromLen; end += self.chromLen self.windows = np.zeros(end - start, dtype = np.int32) for i in range(start, end): down = self.rawMatrix[i, i:(i+maxw)].toarray().ravel() up = self.rawMatrix[(i-maxw+1):(i+1), i].toarray().ravel()[::-1] down[:self._rm+1] = 0; up[:self._rm+1] = 0 diff = up - down ws = self.oriWindow(diff[self._rm+1:]) self.windows[i-start] = ws
def calIS(self, idx, w=5): idx = idx + self.chromLen sub = self.rawMatrix[idx-w:idx, idx+1:idx+w+1] return sub.mean() def preciseBound(self, byregion): for r in byregion: tmp = byregion[r] for d in tmp: si = d[0]//self.res ini = np.inf for i in range(max(si-1,0), min(si+2,self.chromLen)): IS = self.calIS(i) if IS < ini: d[0] = i * self.res ini = IS ei = d[1]//self.res ini = np.inf for i in range(max(ei-1,0), min(ei+2,self.chromLen)): IS = self.calIS(i) if IS < ini: d[1] = i * self.res ini = IS
[docs] def calDI(self, windows, start): """ Calculate Directionality Index (DI) for each bin with adaptive window size. Parameters ---------- windows : 1-D numpy.ndarray, int32 Returned by :py:meth:`tadlib.hitad.chromLev.Chrom.minWindows`. start : int Starting bin index, the window size of which is taken from the 1st place of *windows*. Attributes ---------- DIs : 1-D numpy ndarray, float Calculated adaptive DI array, which has the same size as the input *windows*. """ start = start + self.chromLen self.DIs = np.zeros(len(windows)) for i in range(start, start + len(windows)): w = windows[i-start] if w == 0: w = self._dw down = self.rawMatrix[i, i:(i+w)].toarray().ravel() up = self.rawMatrix[(i-w+1):(i+1), i].toarray().ravel()[::-1] down = down[self._rm+1:]; up = up[self._rm+1:] tmp = self._binbias(up, down) if tmp != 0: self.DIs[i-start] = tmp else: if w < self._dw: w = self._dw down = self.rawMatrix[i, i:(i+w)].toarray().ravel() up = self.rawMatrix[(i-w+1):(i+1), i].toarray().ravel()[::-1] down = down[self._rm+1:]; up = up[self._rm+1:] self.DIs[i-start] = self._binbias(up, down) # trim outliers lthre = np.percentile(self.DIs[self.DIs<0], 0.1) hthre = np.percentile(self.DIs[self.DIs>0], 99.9) self.DIs[self.DIs<lthre] = lthre self.DIs[self.DIs>hthre] = hthre
def _binbias(self, up, down): bias = 0 zeromask = (up != 0) & (down != 0) if zeromask.sum() >= 5: up = up[zeromask]; down = down[zeromask] if up.size <= 1: return bias upmean = up.mean(); downmean = down.mean() SD_1 = np.sum((up - upmean) ** 2) / (up.size * (up.size - 1)) SD_2 = np.sum((down - downmean) ** 2) / (down.size * (down.size - 1)) SD_pool = np.sqrt(SD_1 + SD_2) if SD_pool != 0: bias = (upmean - downmean) / SD_pool return bias
[docs] def splitChrom(self, DIs): """ Split a chromosome into gap-free regions. HMM learning and domain identification procedures will be performed on these regions separately. Parameters ---------- DIs : 1-D numpy ndarray, float Adaptive DI array of the whole chromosome. Generally, we detect runs of zeros in the array as gaps, which will be cut off the chromosome, making entire chromosome pieces of gap-free regions. Attributes ---------- chromRegions : dict, {(start,end):DIs[start:end]} The keys are gap-free regions, and the values are corresponding adaptive DI pieces. gapbins : set Set of bins (in base-pair unit) located in gap regions. """ # minregion and maxgaplen are set intuitively maxgaplen = max(100000 // self.res, 5) minregion = maxgaplen * 2 valid_pos = np.where(DIs != 0)[0] gapsizes = valid_pos[1:] - valid_pos[:-1] endsIdx = np.where(gapsizes > (maxgaplen + 1))[0] startsIdx = endsIdx + 1 chromRegions = {} for i in range(startsIdx.size - 1): start = valid_pos[startsIdx[i]] end = valid_pos[endsIdx[i + 1]] + 1 if end - start > minregion: chromRegions[(start, end)] = DIs[start:end] if startsIdx.size > 0: start = valid_pos[startsIdx[-1]] end = valid_pos[-1] + 1 if end - start > minregion: chromRegions[(start, end)] = DIs[start:end] start = valid_pos[0] end = valid_pos[endsIdx[0]] + 1 if end - start > minregion: chromRegions[(start, end)] = DIs[start:end] if not startsIdx.size: if valid_pos.size > 0: start = valid_pos[0] end = valid_pos[-1] if end - start > minregion: chromRegions[(start, end)] = DIs[start:end] gapmask = np.ones(DIs.size, bool) for r in chromRegions: gapmask[r[0]:r[1]] = 0 gapbins = set(np.where(gapmask)[0]*self.res) self.regionDIs, self.gapbins = chromRegions, gapbins
[docs] def viterbi(self, seq): """ Find the most likely hidden state series using the viterbi algorithm. Parameters ---------- seq : 1-D numbpy ndarray, float Adaptive DI array for any region. Returns ------- path : list List of hidden state labels. Has the same length as the input *seq*. """ path = [int(s.name) for i, s in self.hmm.viterbi(seq)[1][1:-1]] return path
def _getBounds(self, path, junctions=['30']): """ Call boundary sites from hidden state series. By default, these transition modes will be detected as boundaries: "no bias(2) > domain start(0)", "domain end(4) > domain start(0)", and "domain end(4) > no bias(2)". Parameters ---------- path : list Hidden state series returned by :py:meth:`tadlib.hitad.chromLev.Chrom.viterbi`. junctions : list of strings Boundary definitions by using state transition modes. (Default: ['20','40','42']) Returns ------- bounds : 1-D numpy ndarray, int Detected boundary positions in ascending order. 0 and len(*path*) will always be included. """ pathseq = ''.join(map(str, path)) pieces = [pathseq] for junc in junctions: gen = [] for seq in pieces: tmp = seq.split(junc) if len(tmp) == 1: gen.extend(tmp) else: gen.append(tmp[0]+junc[0]) for s in tmp[1:-1]: gen.append(junc[1]+s+junc[0]) gen.append(junc[1]+tmp[-1]) pieces = gen bounds = np.r_[0, np.r_[list(map(len, pieces))]].cumsum() return bounds
[docs] def pipe(self, seq, start): """ Transform an observed sequence into a list of domains. Parameters ---------- seq : 1-D numbpy ndarray, float Adaptive DI array for any region. start : int Chromosome bin index of the *seq* start. Returns ------- domains : list List of domains in the format ``[start bin, end bin, noise level, hierarchical level]``. See Also -------- tadlib.hitad.chromLev.Chrom.refNoise : Calculate the noise level of a given domain tadlib.hitad.aligner.BoundSet : where the meanings of the hierarchical level labels are explained in detail. """ # bin-level domain (not base-pair-level domain!) bounds = self._getBounds(self.viterbi(seq), junctions=['30']) pairs = [[bounds[i], bounds[i+1]] for i in range(len(bounds)-1)] domains = [] for b in pairs: # start, end, noise level, hierarchical level tmp = [b[0]+start, b[1]+start, 0, 0] domains.append(tmp) return domains
[docs] def minCore(self, regionDIs): """ Output domain list for each gap-free region. Parameters ---------- regionDIs : dict Gap-free regions and corresponding adaptive DI arrays. Returns ------- minDomains : dict Gap-free regions and corresponding identified bottom domain list. Different from :py:meth:`tadlib.hitad.chromLev.Chrom.pipe`, the start and the end of a domain are in base-pair unit. """ tmpDomains = {} for region in sorted(regionDIs): seq = regionDIs[region] domains = self.pipe(seq, region[0]) cr = (region[0]*self.res, region[1]*self.res) tmpDomains[cr] = [] for domain in domains: domain[0] = domain[0] * self.res domain[1] = domain[1] * self.res domain[2] = self.refNoise(domain) tmpDomains[cr].append(domain) minDomains = self._orifilter(tmpDomains) return minDomains
[docs] def getDomainList(self, byregion): """ Combine by-region domains into a single list. Parameters ---------- byregion : dict The keys are tuples representing gap-free regions of the chromosome, and the values are corresponding identified domain lists. Returns ------- DomainList : list A merged domain list of all regions """ DomainList = [] for region in sorted(byregion): DomainList.extend(byregion[region]) return DomainList
def _orifilter(self, oriDomains): """ Perform size filtering on the input domain lists. Parameters ---------- oriDomains : dict The keys are tuples representing gap-free regions of the chromosome, and the values are corresponding identified domain lists. Start and end of the domain should be in base-pair unit. Returns ------- filtered : dict Pairs of gap-free regions and corresponding filtered domain lists. """ filtered = {} for region in oriDomains: tmplist = [] for d in oriDomains[region]: if d[1] - d[0] >= (self.minsize*self.res): tmplist.append(d) if len(tmplist): filtered[region] = tmplist return filtered
[docs] def iterCore(self, minDomains, tmpDomains): """ Calculate the mismatch ratio for the input two domain lists. Return 1 if *minDomains* is empty. :py:meth:`tadlib.hitad.chromLev.Chrom.oriIter` uses this method to determine whether to break the iteration. Parameters ---------- minDomains : dict Target domains calculated by the last loop. tmpDomains : dict Query domains returned by the current loop. Returns ------- tol : float Mismatch ratio. """ tmplist = self.getDomainList(copy.deepcopy(minDomains)) reflist = [] for refd in tmplist: reflist.append([self.chrom, refd[0], refd[1], 0]) if not len(reflist): tol = 1 else: tmplist = self.getDomainList(copy.deepcopy(tmpDomains)) alignlist = [] for alignd in tmplist: alignlist.append([self.chrom, alignd[0], alignd[1], 0]) Ref = DomainSet('ref', reflist, self.res) Align = DomainSet('align', alignlist, self.res) worker = DomainAligner(Ref, Align) worker.align('ref','align') count = len(worker.conserved('ref','align')) tol = 1 - count / len(Ref.Domains) return tol
[docs] def oriIter(self, minDomains): """ Iteratvely approximate adaptive window sizes and return the final bottom domain list which will be used in subsequent procedures. For each loop, window sizes are updated according to the latest bottom domains and next loop will re-run the identification pipeline using new window sizes. The iteration terminates if domains between two consecutive loops are very similar (estimated by :py:meth:`tadlib.hitad.chromLev.Chrom.iterCore`). Parameters ---------- minDomains : dict Initial domains served as the target domain list for :py:meth:`tadlib.hitad.chromLev.Chrom.iterCore` at the first iteration. We set it empty in our pipeline. Attributes ---------- minDomains : dict The keys are tuples representing gap-free regions of the chromosome, and the values are corresponding identified bottom domain lists. Start and end of the domain are in base-pair unit. See Also -------- tadlib.hitad.chromLev.Chrom.calDI : calculate DI values according to input window sizes tadlib.hitad.chromLev.Chrom.iterCore : estimate the degree of divergence between two domain lists """ for n_i in range(5): tmpDomains = self.minCore(self.regionDIs) tol = self.iterCore(minDomains, tmpDomains) minDomains = tmpDomains for region in minDomains: for d in minDomains[region]: ds = d[0]//self.res; de = d[1]//self.res Len = de - ds ws = np.max(np.r_['0,2,1', np.arange(1,Len+1), np.arange(Len,0,-1)], axis=0) self.windows[ds:de] = ws self.calDI(self.windows, 0) self.splitChrom(self.DIs) if tol < 0.05: break self.minDomains = minDomains
[docs] def fineDomain(self): """ Identify hierarchical domains within each TAD. Attributes ---------- hierDomains : dict The keys are tuples representing gap-free regions of the chromosome, and the values are corresponding identified hierarchical domain lists. Start and end of the domain are in base-pair unit. See Also -------- tadlib.hitad.chromLev.Chrom.maxCore : identify TADs tadlib.hitad.chromLev.Chrom.subDomains : resolve domain hierarchy within a given TAD """ self.hierDomains = {} for region in sorted(self.maxDomains): rdomains = self.maxDomains[region] if not len(rdomains): continue hdomains = [] for i in range(len(rdomains)): top = rdomains[i][:2] subdomains = {} self.subDomains(top, self.minDomains[region], subdomains=subdomains) if len(subdomains)==1: noise = self.refNoise(top) hdomains.append(top+[noise,0]) else: for d in subdomains: noise = self.refNoise(list(d)) hdomains.append(list(d)+[noise,subdomains[d]]) hdomains.sort() self.hierDomains[region] = hdomains
[docs] def callDomain(self): """ Direct API for our hierarchical domain identification pipeline: - Adaptively estimate window size for each bin. (:py:meth:`tadlib.hitad.chromLev.Chrom.minWindows`) - Calculate adaptive DIs. (:py:meth:`tadlib.hitad.chromLev.Chrom.calDI`) - Iteratively correct adaptive window size and bottom boundary positions. (:py:meth:`tadlib.hitad.chromLev.Chrom.oriIter`) - Identify TADs based on bottom domains. (:py:meth:`tadlib.hitad.chromLev.Chrom.maxCore`) - Resolve domain hierarchy within each TAD. (:py:meth:`tadlib.hitad.chromLev.Chrom.fineDomain`) """ self.minWindows(0, self.chromLen, self._dw) self.calDI(self.windows, 0) self.splitChrom(self.DIs) self.oriIter({}) self.maxCore() self.fineDomain() self.preciseBound(self.hierDomains) self._state = 'Completed'
[docs] def idxmatch(self, domain): """ Pair interactions of the given domain with the upstream and downstream interactions under the same genomic distance. Parameters ---------- domain : [start, end] Domain interval in base-pair unit. Returns ------- cur_store : tuple, (x-coordinates, y-coordinates, interaction frequencies) Interactions within the given domain. up_store : tuple, (x-coordinates, y-coordinates, interaction frequencies) Corresponding upstream interctions. down_store : tuple, (x-coordinates, y-coordinates, interaction frequencies) Corresponding downstream interactions. """ ds = domain[0] // self.res + self.chromLen de = domain[1] // self.res + self.chromLen if (de - ds) < 2: return Rawx = np.tile(np.r_['0,2,0', ds:de], (1, de - ds)) Rawy = np.tile(np.r_['0,2,1', ds:de], (de - ds, 1)) d = Rawy - Rawx Ux = Rawx - d Uy = Rawx Dx = Rawy Dy = Dx + d curInters = self.rawMatrix[Rawx, Rawy].toarray() upInters = self.rawMatrix[Ux, Uy].toarray() downInters = self.rawMatrix[Dx, Dy].toarray() cur_store = (Rawx, Rawy, curInters) up_store = (Ux, Uy, upInters) down_store = (Dx, Dy, downInters) return up_store, cur_store, down_store
[docs] def stablescore(self, domain, bases=[]): """ Calculate TAD score for the given domain. Parameters ---------- domain : [start, end] Domain interval in base-pair unit. bases : list List of bottom domains within the given domain region. Returns ------- inout : float TAD score defined as the enrichment between intra-domain interaction frequencies and inter-domain interaction frequencies controlling for the impact of genomic distance. See Also -------- tadlib.hitad.chromLev.Chrom.idxmatch : Pair intra-domain and inter-domain interactions with the same genomic distance tadlib.calfea.analyze.Core.longrange : Assign the weight value for each intra-domain interaction according to the genomic distance and the local interaction background. """ ori = domain[0] // self.res + self.chromLen end = domain[1] // self.res + self.chromLen P = self.idxmatch(domain) if P is None: return 0 ## Compare inside-domain interactions with outside ones, larger, better inside = np.r_[[]]; outside = np.r_[[]] # Pair with upstream interactions Umask = ((2*P[1][0] - ori) <= P[1][1]) & (P[1][1] - P[1][0] > self._rm) &\ (P[1][2] != 0) & (P[0][2] != 0) inside = np.r_[inside, P[1][2][Umask]] outside = np.r_[outside, P[0][2][Umask]] # Pair with downstream interactions Dmask = ((2*P[2][0] - ori) >= P[2][1]) & (P[2][1] - P[2][0] > self._rm) &\ (P[2][1] >= end) & (P[1][2] != 0) & (P[2][2] != 0) inside = np.r_[inside, P[1][2][Dmask]] outside = np.r_[outside, P[2][2][Dmask]] inout = 0 if inside.size > 3: weights = 0.5 if len(bases): M = self.getSelfMatrix(domain[0], domain[1]) newM, convert = analyze.manipulation(M) if newM.shape[0] > 7: # 2*ww + 1 work = analyze.Core(M) work.longrange(pw=1, ww=3) fE = work.convertMatrix(work.fE) for d in bases: ds = (d[0]-domain[0])//self.res de = (d[1]-domain[0])//self.res fE[ds:de,ds:de] = 0 pkv = fE[fE>0] if pkv.size > 0: norm = MidpointNormalize(vmin=np.percentile(pkv,1), vmax=np.percentile(pkv,99), midpoint=1) W = norm(fE).data W[fE==0] = 0.5 weights = np.r_[W[Umask], W[Dmask]] diff = (inside - outside) / (inside + outside) * weights inout = diff.mean() return inout
def _updateCache(self, cache, key, bases=[]): if not key in cache: cache[key] = self.stablescore(key, bases)
[docs] def scoreCache(self, domainlist, cache = {}): """ Calculate and cache the TAD scores for any combinations of consecutive bottom domains. Parameters ---------- domainlist : list of [start,end] List of bottom domain intervals in base-pair unit. cache : dict Cache of the TAD scores. The keys are in (start,end) format, intervals of continuous regions, and the values are corresponding calculated scores. See Also -------- tadlib.hitad.chromLev.Chrom.stablescore : Calculate TAD score of any genomic interval """ for i in range(len(domainlist)): self._updateCache(cache, tuple(domainlist[i][:2])) if domainlist[i][2] > 0.2:# Noise domains will stay alone continue for j in range(i+1, len(domainlist)): if (domainlist[j][1] - domainlist[i][0] > self.maxapart): break if domainlist[j][2] > 0.2: continue key = (domainlist[i][0], domainlist[j][1]) bases = [d[:2] for d in domainlist[i:j+1]] self._updateCache(cache, key, bases)
[docs] def maxscorepath(self, domainlist, cache): """ An implementation for our proposed algorithm to find the best separation solution at chromosomal/domain level, given bottom domain list and pre-computed scores. Parameters ---------- domainlist : list of [start,end] List of bottom domain intervals in base-pair unit. cache : dict Pre-computed scores for any combinations of continuous bottom domains. The keys are intervals of continuous regions in (start,end) format, and the values are corresponding scores. Returns ------- bests : set of (sidx, eidx) Each element indicates one merged domain of the solution, represented as (start index, end index) of the input *domainlist*. """ # score cache uses domain coordinate intervals scores = cache # Dynamic programming # paths uses domain index intervals paths = {} for i in range(len(domainlist)): Len = domainlist[i][1]//self.res - domainlist[i][0]//self.res paths[i] = {} pre = paths.get(i-1, {(-1,-1): [0,None]}) maxp = None; maxs = float('-inf') for pp in pre: if i <= pp[1]: unit = scores[(domainlist[pp[0]][0],domainlist[pp[1]][1])] paths[i][pp] = [pre[pp][0]+Len*unit, pp] else: if pre[pp][0] > maxs: maxs = pre[pp][0]; maxp = pp for j in range(i, len(domainlist)): key = (domainlist[i][0], domainlist[j][1]) if (key[1] - key[0] > self.maxapart) and (j > i): break tryget = scores.get(key, None) if tryget is None: continue unit = scores[(domainlist[i][0],domainlist[j][1])] paths[i][(i,j)] = [maxs+Len*unit, maxp] # Backtacking lastp = None; lasts = float('-inf') for pp in paths[len(domainlist)-1]: if paths[len(domainlist)-1][pp][0] > lasts: lasts = paths[len(domainlist)-1][pp][0] lastp = pp if lastp is None: return bestpath = [lastp] for i in range(len(domainlist))[::-1]: lastp = paths[i][lastp][1] if lastp == (-1,-1): break bestpath = [lastp] + bestpath bests = set(bestpath) return bests
[docs] def maxCore(self, cache={}): """ Perform TAD identification. We define TADs as domains to optimize chromosome separation(based on some objective function), which is solved by using an algorithm like dynamic programming implemented in :py:meth:`tadlib.hitad.chromLev.Chrom.maxscorepath`. Parameters ---------- cache : dict TAD scores for all combinations of consecutive bottom domains will be pre-computed and stored in this dict. The keys are tuples (start, end) representing merged domain intervals. (Default: {}) Attributes ----------- maxDomains : dict Optimized TADs. cache : dict Cached TAD scores. See Also -------- tadlib.hitad.chromLev.Chrom.scoreCache : Pre-compute TAD scores for all combinations of consecutive bottom domains. tadlib.hitad.chromLev.Chrom.maxscorepath : find the best TAD list """ maxDomains = {} for region in sorted(self.minDomains): rdomains = self.minDomains[region] if not len(rdomains): continue if len(rdomains) < 2: maxDomains[region] = rdomains continue self.scoreCache(rdomains, cache) premerge = self.maxscorepath(rdomains, cache) newdomain = [] for b in sorted(premerge): tl = rdomains[b[0]:b[1]+1] newdomain.append([tl[0][0], tl[-1][1], 0, 0]) newdomain.sort() if len(newdomain): maxDomains[region] = newdomain self.maxDomains = self._orifilter(maxDomains) self.cache = cache
[docs] def subDomains(self, domain, reflist, clv = 0, aM=None, W=None, subdomains={}): """ A recusive method (function) to identify inner domain hierarchy of a TAD (or a domain). Sub-TADs of each level are defined as the best separation of the outer domain. Parameters ---------- domain : [start, end] Outer layer domain interval in base-pair unit. reflist : list List of bottom domains within the region of the outer domain. clv : int Global domain level of the outer domain. The TADs have the level 0, the sub-TADs within a TAD have the level 1, and sub-sub-TADs within a sub-TAD have the level 2, and so forth. (Default: 0) aM : 2-D numpy ndarray or None Arrowhead matrix of the outer domain. (Default: None) W : 2-D numpy ndarray or None Weight matrix corresponding to the contact matrix of the outer domain. See :py:meth:`tadlib.hitad.chromLev.Chrom.getWeightMatrix` for detailed calculation. (Default: None) subdomains : dict A container for domains of all hierarchy. The keys are domain intervals in base-pair unit, and values are corresponding global levels. See Also -------- tadlib.hitad.chromLev.Chrom.maxscorepath : find the best domain list optimally separating domain-level interactions. """ def localUpdate(key): ori = key[0]//self.res - domain[0]//self.res end = key[1]//self.res - domain[0]//self.res mask1 = common & (yidx >= end) & (xidx < end) & ((2*xidx - ori) >= yidx) mask2 = common & (yidx < end) & (xidx >= ori) & (2*xidx >= yidx) &\ ((2*xidx - ori) <= yidx) part1 = aM[mask1] part2 = aM[mask2] merge = np.r_[part1, -part2] if merge.size > 3: tx,ty = np.where(mask1) weights = np.r_[W[2*tx-ty,tx], W[mask2]] diff = merge * weights biases[key] = diff.mean() else: biases[key] = 0 def getsublist(top, reflist): sublist = [d for d in reflist if ((d[0]>=top[0])and(d[1]<=top[1]))] sublist.sort() return sublist subdomains[(domain[0], domain[1])] = clv if aM is None: aM = self.toArrowhead(domain[0], domain[1]) sublist = getsublist((domain[0], domain[1]), reflist) if W is None: W = self.getWeightMatrix(domain[0], domain[1], sublist) xidx, yidx = np.indices(aM.shape) common = ((yidx - xidx) > self._rm) & (aM != 1) & (aM != -1) biases = {} if len(sublist) <= 1: return for i in range(len(sublist)): key = (sublist[i][0], sublist[i][1]) localUpdate(key) for j in range(i+1, len(sublist)): key = (sublist[i][0], sublist[j][1]) localUpdate(key) biases[(domain[0], domain[1])] = 0 prebests = self.maxscorepath(sublist, biases) #---------------------------------------------------------------------- prebests = sorted(prebests) dlist = [sublist[0][0]] for i in range(len(prebests)-1): dlist.append(sublist[prebests[i][1]][1]) dlist.append(sublist[-1][1]) if len(dlist) == 2: return #---------------------------------------------------------------------- for i in range(len(dlist)-1): nlv = clv + 1 tmpdomain = (dlist[i], dlist[i+1]) tmpori = tmpdomain[0]//self.res - domain[0]//self.res tmpend = tmpdomain[1]//self.res - domain[0]//self.res taM = aM[tmpori:tmpend, tmpori:tmpend] tW = W[tmpori:tmpend, tmpori:tmpend] self.subDomains(tmpdomain, sublist, clv=nlv, aM=taM, W=tW, subdomains=subdomains)
[docs] def getWeightMatrix(self, start, end, bases=[]): """ Calculate weights for each intra-domain interaction by considering the genomic distance and the local interaction background. Parameters ---------- start, end : int The domain interval in base-pair unit. bases : list List of the bottom domains within the given interval. Returns ------- W : 2-D numpy.ndarray The weight matrix. (An upper triangular matrix) """ M = self.getSelfMatrix(start, end) W = np.ones(M.shape) * 0.5 newM, convert = analyze.manipulation(M) if newM.shape[0] > 7: # 2*ww + 1 work = analyze.Core(M) work.longrange(pw=1, ww=3) fE = work.convertMatrix(work.fE) for d in bases: ds = (d[0]-start)//self.res de = (d[1]-start)//self.res fE[ds:de,ds:de] = 0 pkv = fE[fE>0] if pkv.size > 0: norm = MidpointNormalize(vmin=np.percentile(pkv,1), vmax=np.percentile(pkv,99), midpoint=1) W = norm(fE).data W[fE==0] = 0.5 return W
[docs] def getSelfMatrix(self, start, end): """ Return the contact matrix of any given region. Parameters ---------- start, end : int The region interval in base-pair unit. Returns ------- Matrix : 2-D numpy ndarray, float Sub contact matrix. """ startidx = start // self.res + self.chromLen endidx = end // self.res + self.chromLen Matrix = self.rawMatrix[startidx:endidx, startidx:endidx].toarray() x, y = np.nonzero(Matrix) Matrix[y, x] = Matrix[x, y] return Matrix
[docs] def refNoise(self, domain): """ Return noise level of a domain, which is simply defined as the zero entry ratio of the contact matrix. """ if domain[1] - domain[0] < self.res*self.minsize: return 0 matrix = self.getSelfMatrix(domain[0], domain[1]) total = matrix.size - np.arange(len(matrix), len(matrix)-self._rm-1, -1).sum()*2 +\ len(matrix) if total < 5: return 0 else: nx, ny = np.nonzero(matrix) mask = np.abs(ny-nx) > self._rm sigNum = mask.sum() # Number of nonzero entries apart from diag return 1-sigNum/total
[docs] def toArrowhead(self, start, end): """ Perform Arrowhead transformation on contact matrix of a given genomic region. Parameters ---------- start, end : int The region interval. Returns ------- A : 2-D numpy ndarray, float Transformed matrix. (A symmetric matrix) """ startidx = start // self.res + self.chromLen endidx = end // self.res + self.chromLen maxd = endidx - startidx - 1 forward = startidx - maxd raw = self.rawMatrix[forward:endidx, forward:endidx].toarray() tx, ty = np.nonzero(raw) raw[ty, tx] = raw[tx, ty] N = maxd + 1 x, y = np.triu_indices(N) rawx = x + maxd rawy = y + maxd d = y - x minusY = rawx - d denominator = raw[minusY, rawx] + raw[rawx, rawy] denominator[denominator==0] = 1 elemA = (raw[minusY, rawx] - raw[rawx, rawy]) / denominator A = np.zeros((N, N)) A[x, y] = elemA A[y, x] = elemA return A
[docs] def plot(self, start, end, Domains, figname, arrowhead = False, vmin=None, vmax=None): """ Given a genomic region and a domain list, plot corresponding contact heatmap and all domains (represented as diagonal squares) within the region. Parameters ---------- start, end : int The region interval. Domains : dict The keys are tuples representing gap-free regions, and values are corresponding identified domains. figname : str or None If not None, the figure will be saved, otherwise it will only be shown in an interactive window. (Default: None) arrowhead : bool If True, the Arrowhead transformed matrix will be plotted instead of the raw contact matrix. (Default: False) """ import matplotlib.pyplot as plt from matplotlib.colors import LinearSegmentedColormap def caxis_H(ax): """ Axis control for the heatmap. """ ax.yaxis.set_ticks_position('left') ax.xaxis.set_ticks_position('bottom') ax.tick_params(axis = 'both', labelsize = 12, length = 5, pad = 7) def caxis_S(ax, color): """ Axis control for the DI track. """ for spine in ['right', 'top']: ax.spines[spine].set_visible(False) ax.tick_params(axis = 'both', bottom = False, top = False, left = False, right = False, labelbottom = False, labeltop = False, labelleft = False, labelright = False) ax.spines['left'].set_lw(1.5) ax.spines['left'].set_color('#1B9E77') ax.spines['left'].set_alpha(0.9) ax.spines['left'].set_linestyle('dotted') ax.spines['bottom'].set_lw(0.5) ax.spines['bottom'].set_color(color) ax.spines['bottom'].set_alpha(0.9) def properU(pos): """ Express a genomic position in proper unit (KB or MB). """ i_part = int(pos) // 1000000 # Integer Part d_part = (int(pos) % 1000000) // 1000 # Decimal Part if (i_part > 0) and (d_part > 0): return ''.join([str(i_part), 'M', str(d_part), 'K']) elif (i_part == 0): return ''.join([str(d_part), 'K']) else: return ''.join([str(i_part), 'M']) def boundDrawer(ax, regions, startidx, color = '#A5ACAF'): pairs = [(bi[0] - startidx, bi[1] - startidx) for bi in regions] for corner in pairs: if (corner[0] <= 0): ax.plot([0, corner[1]], [corner[1], corner[1]], color = color, linewidth = 1.5) ax.plot([corner[1], corner[1]], [0, corner[1]], color = color, linewidth = 1.5) elif (corner[1] >= interval): ax.plot([corner[0], corner[0]], [corner[0], interval], color = color, linewidth = 1.5) ax.plot([corner[0], interval], [corner[0], corner[0]], color = color, linewidth = 1.5) else: ax.plot([corner[0], corner[0]], [corner[0], corner[1]], color = color, linewidth = 1.5) ax.plot([corner[0], corner[1]], [corner[0], corner[0]], color = color, linewidth = 1.5) ax.plot([corner[0], corner[1]], [corner[1], corner[1]], color = color, linewidth = 1.5) ax.plot([corner[1], corner[1]], [corner[0], corner[1]], color = color, linewidth = 1.5) # Basic Settings matplotlib.rcParams['xtick.direction'] = 'out' matplotlib.rcParams['ytick.direction'] = 'out' size = (12, 11) width = 0.618; Left = (1 - width) / 2 HB = 0.1; HH = width * size[0] / size[1] # HeatMap Bottom / HeatMap Height SB = HB + HH # Bottom of tracks ST = 0.9 # The Top of tracks SH = (ST - SB) / 2 raw_cmap = LinearSegmentedColormap.from_list('interaction', ['#FFFFFF','#CD0000']) arrow_cmap = plt.cm.RdBu_r DI_color = '#1B9E77' fig = plt.figure(figsize = size) if arrowhead: Matrix = self.toArrowhead(start, end) norm = MidpointNormalize(vmin=Matrix.min(), midpoint=0, vmax=Matrix.max()) Params = {'norm': norm, 'cmap': arrow_cmap} else: Matrix = self.getSelfMatrix(start, end) nonzero = Matrix[np.nonzero(Matrix)] if vmin is None: vmin = 0 else: vmin = vmin if vmax is None: vmax = np.percentile(nonzero, 95) else: vmax = vmax Params = {'vmin': vmin, 'vmax': vmax, 'cmap': raw_cmap} startidx = start // self.res endidx = end // self.res interval = Matrix.shape[0] ax = fig.add_axes([Left, HB, width, HH]) sc = ax.imshow(Matrix, aspect = 'auto', interpolation = 'none', extent = (0, interval, 0, interval), origin = 'lower', **Params) xlim = ax.get_xlim() ylim = ax.get_ylim() ticks = list(np.linspace(0, interval, 6).astype(int)) pos = [(start + t * self.res) for t in ticks] labels = [properU(i) for i in pos] Domains = np.r_[self.getDomainList(Domains)] Domains = np.r_['1,2,0', Domains[:,0], Domains[:,1], Domains[:,-1]] Domains[:,0] = Domains[:,0] // self.res Domains[:,1] = Domains[:,1] // self.res # Top-level mask = (Domains[:,2] == 0) & (Domains[:,1] > startidx) & (Domains[:,0] < endidx) extract = Domains[mask] boundDrawer(ax, extract, startidx, color = '#60636A') # Lower-level mask = (Domains[:,1] > startidx) & (Domains[:,0] < endidx) & (Domains[:,2] > 0) extract = Domains[mask] boundDrawer(ax, extract, startidx, color = '#A5ACAF') ax.set_xticks(ticks) ax.set_xticklabels(labels) ax.set_yticks(ticks) ax.set_yticklabels(labels) ax.set_xlim(xlim) ax.set_ylim(ylim) caxis_H(ax) axc = fig.add_axes([Left + width + 0.08, HB, 0.03, HH]) fig.colorbar(sc, cax = axc) track = self.DIs[startidx:endidx] ax = fig.add_axes([Left, SB, width, SH]) ax.fill_between(np.arange(track.size), track, color = DI_color, edgecolor = 'none') ax.set_ylabel('Adaptive DI', labelpad = 43, rotation = 'horizontal', style = 'italic', size = 12) ax.set_xlim(xlim) caxis_S(ax, DI_color) if figname != None: fig.savefig(figname, bbox_inches='tight') plt.close(fig) else: plt.show()
[docs]class MultiReps(DomainAligner): """ We define *MultiReps* to: - Hold Hi-C data of the same chromosome from different biological replicates at the same time - Provide an interface to identify hierarchical domains by using different replicate data independently and maintain the reproducible domains to form the final domain list. Parameters ---------- chrom : str Chromosome label. res : int Hi-C data resolution in base-pair unit. datasets : dict The keys are unique replicate labels, and the values are constructed *Chrom* objects by using the Hi-C data of corresponding biological replicates. """ def __init__(self, chrom, res, datasets): self.chrom = chrom self.res = res self.Queue = datasets # key: rep label, value: Chrom object self._state = 'Unfinished'
[docs] def getDomainList(self, byregion): """ Combine by-region domains into a single list. Each domain in the returned list is represented in the format ``[chromosome label, start bp, end bp, hierarchical level]``. """ domainlist = [] for region in sorted(byregion): tmp = byregion[region] for d in tmp: if d[1] - d[0] >= 3*self.res: domainlist.append([self.chrom]+d[:2]+[d[-1]]) return domainlist
[docs] def callDomain(self): """ Find reproducible domains between replicates by using our domain-level alignment strategy. """ reps = sorted(self.Queue) tl = self.getDomainList(self.Queue[reps[0]].hierDomains) tg = DomainSet(reps[0], tl, self.res) for rep in reps[1:]: ql = self.getDomainList(self.Queue[rep].hierDomains) qy = DomainSet(rep, ql, self.res) tl = self._interp(self.reproducible(tg, qy)) tg = DomainSet('mtg', tl, self.res) pool = set() self._correctDomainTree(tg.Domains, pool) self.mergedDomains = hierFormat(pool) self._state = 'Completed'
def _correctDomainTree(self, subtree, pool, cur=None, ref_s=None, ref_e=None): for node in subtree: chrom, start, end, label = node if (not ref_s is None) and (not ref_e is None) and (not cur is None): if start - cur[1] <= 2*self.res: start = ref_s if cur[2] - end <= 2*self.res: end = ref_e pool.add((chrom,start,end)) self._correctDomainTree(subtree[node], pool, node, start, end)
[docs] def reproducible(self, tg, qy): """ Subprocess of *callDomain* for replicate alignment and reproducibility determination. """ work = repAligner(tg, qy) tcache = work._align(tg, qy) qcache = work._align(qy, tg) source = {} tbase = self._customize(tcache, qcache, tg, qy, source) qbase = self._customize(qcache, tcache, qy, tg, source) pool = set() self._extract(tbase, pool, tg) self._extract(qbase, pool, qy) dlist = hierFormat(pool) return dlist
def _iscompatible(self, dlist, ref): dlist.sort() lvs = [] for d in dlist: lvs.append(ref.boundclass[(d[0], d[1])]) lvs.append(ref.boundclass[(d[0], d[2])]) if len(lvs) <= 2: return True else: inlv = min(lvs[1:-1]) blv = max(lvs[0], lvs[-1]) return inlv >= blv def _correct_core(self, forward, backward, pool, tg, qy, me): for lv in forward: for p in forward[lv]: tl, ql = forward[lv][p].info tcheck = self._iscompatible(tl, tg) qcheck = self._iscompatible(ql, qy) if (not tcheck) or (not qcheck): if me: pool[lv][p] = Container([tl,ql,0]) continue found = self._search(backward, p) if found: if me: pool[lv][p] = Container([tl,ql,1]) else: mlv = min(set([d[-1] for d in tl])) if mlv==0: if me: pool[lv][p] = Container([tl,ql,0]) continue if len(ql)==0: if me: pool[lv][p] = Container([tl,ql,0]) continue ck = [ql[0][1],ql[-1][2]] valid = True for blv in backward: for bp in backward[blv]: q = list(bp[0][1:3]) if ((ck[0]<q[0]<ck[1]) and (q[1]>ck[1])) or \ ((ck[0]<q[1]<ck[1]) and (q[0]<ck[0])): valid = False break if not valid: break if valid: if me: pool[lv][p] = Container([tl,ql,1]) else: tl, ql = ql, tl mlv = min(set([d[-1] for d in tl])) if not mlv in pool: pool[mlv] = {p[::-1]:Container([tl,ql,1])} else: pool[mlv][p[::-1]] = Container([tl,ql,1]) else: if me: pool[lv][p] = Container([tl,ql,0]) def _correct_pairs(self, forward, backward, tg, qy): pool = copy.deepcopy(forward) self._correct_core(forward, backward, pool, tg, qy, True) self._correct_core(backward, forward, pool, qy, tg, False) return pool def _customize(self, cache, ref, tg, qy, source): tcore = {} for k in sorted(cache): if k[1] is None: continue sk = k[::-1] if not sk in ref: continue if sk in source: continue hit = ref[sk] target = cache[k] tl, ql = target.info if (len(tl)==1) and (len(ql)>1): # wait for reverse alignment continue target = self._correct_pairs(target, hit, tg, qy) tcore[k] = Container(target.info[:2]) for lv in target: for p in target[lv]: tl, ql, label = target[lv][p].info tlvs = set([d[-1] for d in tl]) if label and ((len(tl)==1) or (len(ql)==1)): mlv = min(tlvs) if (mlv==0) and (len(tl)>1): for d in tl: nk = (tuple(d[:-1]),None) if not d[-1] in tcore[k]: tcore[k][d[-1]] = {nk:Container([[d],[]])} else: tcore[k][d[-1]][nk] = Container([[d],[]]) else: if not mlv in tcore[k]: tcore[k][mlv] = {p:Container([tl,ql])} else: tcore[k][mlv][p] = Container([tl,ql]) else: for d in tl: nk = (tuple(d[:-1]),None) if not d[-1] in tcore[k]: tcore[k][d[-1]] = {nk:Container([[d],[]])} else: tcore[k][d[-1]][nk] = Container([[d],[]]) source.update(tcore) return tcore def _extract(self, base, pool, tg): for k in sorted(base): tl, ql = base[k].info topobounds = {} # bound positions should be passed down if len(tl)==len(ql)==1: d = tuple(tl[0]) topobounds[d] = tg.Domains[d] new = self.intersect(k[0],k[1]) pool.add(tuple(new)) self._updateTopo(topobounds, d, new) else: for d in map(tuple, tl): topobounds[d] = tg.Domains[d] self._updateTopo(topobounds, d, []) for lv in base[k]: for t in base[k][lv]: tl, ql = base[k][lv][t].info if (len(tl)==1) and (len(ql)==0): new = tl[0][:-1] if (len(tl)==1) and (len(ql)>=1): new = self.intersect(t[0],t[1]) if (len(tl)>1) and (len(ql)==1): new = self.intersect(t[0],t[1]) bounds = self._getTopo(topobounds, tuple(tl[0])) assert not bounds is None if len(bounds): if len(ql) == 0: continue new = self.intersect(new, bounds) if new[2] - new[1] < 3*self.res: continue if not len(bounds): if (len(tl)==1) and (len(ql)<=1): pool.add(tuple(new)) self._updateTopo(topobounds, tuple(tl[0]), new) else: pool.add(tuple(new)) for d in tl: self._updateTopo(topobounds, tuple(d), new) def _updateTopo(self, topo, d, bounds): for k in topo: if not d is None: if k == d: topo[k].bounds = bounds d = self._updateTopo(topo[k], None, bounds) else: d = self._updateTopo(topo[k], d, bounds) if d is None: break else: topo[k].bounds = bounds self._updateTopo(topo[k], None, bounds) return d def _getTopo(self, topo, d): bounds = None for k in topo: if k == d: bounds = topo[k].bounds else: bounds = self._getTopo(topo[k], d) if not bounds is None: break return bounds def intersect(self, d1, d2): return [d1[0], max(d1[1],d2[1]), min(d1[2],d2[2])] def _interp(self, rawlist): """ Fill holes for domains of all levels. We skip gap regions detected on any biological replicate dataset. Consider [['1',0,200000],['1',350000,600000]], then the region [200000,350000] is defined as a hole in original domain list. Parameters ---------- rawlist : list Original domain list. Each domain is represented by [chrom,start,end,label], in which *start* and *end* should be in base-pair unit, and *label* indicates the hierarchical level of the domain. Returns ------- domainlist : list Domain list after hole filling. See Also -------- tadlib.hitad.chromLev.Chrom.splitChrom : where the gap regions are detected """ self.gapbins = set() for rep in self.Queue: self.gapbins.update(self.Queue[rep].gapbins) bo = BoundSet('pse', rawlist, self.res) D = set() newlist = [] for d in rawlist: D.add(tuple(d[:3])) # Hash the domain list newlist.append(d[:3]) for i in range(len(bo.Bounds)-1): if bo.Bounds[i+1][0] != bo.Bounds[i][0]: continue td = (bo.Bounds[i][0], bo.Bounds[i][1], bo.Bounds[i+1][1]) if td[2] - td[1] < 3*self.res: continue dbset = set(range(td[1],td[2],self.res)) gapratio = len(dbset.intersection(self.gapbins)) / len(dbset) if gapratio >= 0.2: continue if not td in D: newlist.append(list(td)) domainlist = hierFormat(newlist) return domainlist
class repAligner(DomainAligner): """ A customized *tadlib.hitad.aligner.DomainAligner* for hierarchical domain alignment between two replicates. The API stays the same. """ def __init__(self, *args): DomainAligner.__init__(self, *args) def _align(self, tg, qy): # First, align the top-level domains ttl = [d for d in tg.Domains if d[-1]==0] tql = [d for d in qy.Domains if d[-1]==0] ttg = DomainSet(tg.Label, ttl, tg.res, False) tqy = DomainSet(qy.Label, tql, qy.res, False) pairs, _ = self._aligncore(ttg, tqy) cache = {} for k in sorted(pairs): tl = tg.getregion(*k[0]) ftg = DomainSet(tg.Label, tl, tg.res) ql = qy.getregion(*k[1]) fqy = DomainSet(qy.Label, ql, qy.res) cache[k] = Container(pairs[k]) # Parse the hierarchy step by step ori = 1 if len(pairs[k][0])==1 else 0 for tv in range(ori, max(ftg.levs)+1): cache[k][tv] = {} tl = ftg.getregion(*(k[0]+(tv,))) ntg = DomainSet(ftg.Label, tl, ftg.res, False) ql = self._localhits(ntg, fqy) nqy = DomainSet(fqy.Label, ql, fqy.res, False) npairs, _ = self._aligncore(ntg, nqy) for p in npairs: cache[k][tv][p] = Container(npairs[p]) return cache def _toberobust(self, tl, ql, tg, qy, t_ref, q_ref, tol=0.9): if t_ref is None: tl = tg.getregion(tl[0][0], tl[0][1], tl[-1][2]) ql = qy.getregion(ql[0][0], ql[0][1], ql[-1][2]) else: tl = t_ref.getregion(tl[0][0], tl[0][1], tl[-1][2], 0) ql = q_ref.getregion(ql[0][0], ql[0][1], ql[-1][2], 0) tl.sort() ql.sort() bylev = [[]] olev = ql[0][-1] for d in ql: if d[-1] == olev: bylev[-1].append(d) else: olev = d[-1] bylev.append([d]) if len(bylev) > 1: # query domain levels are heterogeneous ref = self.overlap([tl[0][1],tl[-1][2]], [ql[0][1],ql[-1][2]]) score = 0 hit = [] for c in bylev: tmp = self.overlap([tl[0][1],tl[-1][2]], [c[0][1],c[-1][2]]) if tmp / ref > score: score = tmp / ref hit = c if score >= tol: ql = hit tk = (tl[0][0], tl[0][1], tl[-1][2]) qk = (ql[0][0], ql[0][1], ql[-1][2]) return tk, qk, [tl, ql]