# -*- coding: utf-8 -*-
"""
Created on Mon Apr 25 10:36:44 2016
@author: wxt
"""
from __future__ import division
import os, pickle, tempfile, time, logging, cooler
import multiprocessing as mp
import numpy as np
from scipy.sparse import triu
from pomegranate import NormalDistribution, HiddenMarkovModel, GeneralMixtureModel, State
from tadlib.hitad.chromLev import Chrom, MultiReps
total_cpu = mp.cpu_count()
log = logging.getLogger(__name__)
[docs]class Genome(object):
"""
*Genome* is built on top of :py:class:`tadlib.hitad.chromLev.Chrom`. We
use it to:
- Load bin-level Hi-C data
- Initialize, pickle and organize *Chrom* objects
- Call hierarchical domains of each chromosome in parallel
Parameters
----------
datasets : 2-level dict, {resolution(int):{biological_replicate_label(str):data_path,...}}
*data_path* indicates the *cool* URI under corresponding resolution and biological
replicate label.
maxsize : int
Maximum allowable domain size in base-pair unit. (Default: 4000000)
cache : str or None
Cache folder path. If None, the folder returned by :py:func:`tempfile.gettempdir` will
be used. (Default: None)
Attributes
----------
data : 3-level dict. {chrom(str):{resolution(int):{biological_replicate_label(str):cachedfile,...}},...}
Different from the input datasets, it organizes data by chromosome
label, and each bottom-level value indicates one pickled
:py:class:`tadlib.hitad.chromLev.Chrom` file under the *cache* folder.
Results : list
Final consistent domain list merged from all chromosome results.
"""
def __init__(self, datasets, balance_type='weight', maxsize=4000000, cache=None,
exclude=['chrM', 'chrY'], DIcol='DIs'):
data = datasets
self.exclude = exclude
self.DIcol = DIcol
if balance_type.lower() == 'raw':
correct = False
else:
correct = balance_type
# We don't read data in memory at this point.
# We only construct the mapping for loading convenience
self.data = {}
for res in data:
for rep in data[res]:
lib = cooler.Cooler(data[res][rep])
for i in lib.chromnames:
if i in self.exclude:
continue
if not i in self.data:
self.data[i] = {res:{rep:lib}}
else:
if res in self.data[i]:
self.data[i][res][rep] = lib
else:
self.data[i][res] = {rep:lib}
if cache is None:
self._cache = tempfile.gettempdir()
else:
self._cache = os.path.abspath(os.path.expanduser(cache))
if not os.path.isdir(cache):
os.makedirs(cache)
# Before starting calling, we make cache data under the cache folder
for chrom in self.data:
log.debug('Chrom {0}:'.format(chrom))
ms = self.data[chrom]
for res in ms:
for rep in ms[res]:
log.debug(' resolution: {0}, {1}'.format(res, rep))
tdata = triu(ms[res][rep].matrix(balance=correct, sparse=True).fetch(chrom)).tocsr()
work = Chrom(chrom, res, tdata, rep, maxsize)
work.Label = rep
tl = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
kw = {'suffix':tl, 'dir':self._cache}
fd, tmpfil = tempfile.mkstemp(**kw)
os.close(fd)
with open(tmpfil, 'wb') as output:
log.debug(' Cache Chrom object into {0} ...'.format(tmpfil))
pickle.dump(work, output)
self.data[chrom][res][rep] = tmpfil
time.sleep(3)
self.cools = datasets
[docs] def oriHMMParams(self):
"""
Set initial parameters for the Hidden Markov Model (HMM).
Attributes
----------
HMMParams : dict
Has 3 keys: "A", state transition matrix, "B" (emission probabilities),
specifying parameters (Means, Variances, Weights) of the mixture
Gaussian distributions for each hidden state, and "pi", indicating
the hidden state weights. This dict will be updated after learning
procedure.
"""
hmm = HiddenMarkovModel()
# GMM emissions
# 4 Hidden States:
# 0--start, 1--downstream, 2--upstream, 3--end
numdists = 3 # Three-distribution Gaussian Mixtures
var = 7.5 / (numdists - 1)
means = [[], [], [], []]
for i in range(numdists):
means[3].append(i * 7.5 / ( numdists - 1 ) + 2.5)
means[2].append(i * 7.5 / ( numdists - 1 ))
means[1].append(-i * 7.5 / ( numdists - 1 ))
means[0].append(-i * 7.5 / ( numdists - 1 ) - 2.5)
states = []
for i, m in enumerate(means):
tmp = []
for j in m:
tmp.append(NormalDistribution(j, var))
mixture = GeneralMixtureModel(tmp)
states.append(State(mixture, name=str(i)))
hmm.add_states(*tuple(states))
# Transmission matrix
#A = [[0., 1., 0., 0.],
# [0., 0.5, 0.5, 0.],
# [0., 0., 0.5, 0.5],
# [1., 0., 0., 0.]]
hmm.add_transition(states[0], states[1], 1)
hmm.add_transition(states[1], states[1], 0.5)
hmm.add_transition(states[1], states[2], 0.5)
hmm.add_transition(states[2], states[2], 0.5)
hmm.add_transition(states[2], states[3], 0.5)
hmm.add_transition(states[3], states[0], 1)
#pi = [0.2, 0.3, 0.3, 0.2]
hmm.add_transition(hmm.start, states[0], 1)
hmm.add_transition(states[3], hmm.end, 1)
hmm.bake()
return hmm
def train_data(self, res, rep):
lib = cooler.Cooler(self.cools[res][rep])
seqs = []
for c in lib.chromnames:
if c in self.exclude:
continue
tmpfil = self.data[c][res][rep]
with open(tmpfil, 'rb') as source:
tmpcache = pickle.load(source)
tmpcache.minWindows(0, tmpcache.chromLen, tmpcache._dw)
tmpcache.calDI(tmpcache.windows, 0)
tmpcache.splitChrom(tmpcache.DIs)
for region in tmpcache.regionDIs:
withzeros = tmpcache.regionDIs[region]
nozeros = withzeros[withzeros!=0]
if nozeros.size > 20:
seqs.append(nozeros)
return seqs
[docs] def learning(self, cpu_core = 1):
"""
Prepare training data and learn HMM model parameters for each dataset.
Parameters
----------
cpu_core : int
Number of processes to launch.
"""
for res in self.cools:
for rep in self.cools[res]:
log.debug(' resolution: {0}, {1}'.format(res, rep))
model = self.oriHMMParams()
seqs = self.train_data(res, rep)
model.fit(seqs, algorithm='baum-welch', max_iterations=10000,
stop_threshold=1e-5, n_jobs=cpu_core, verbose=False)
lib = cooler.Cooler(self.cools[res][rep])
for c in lib.chromnames:
if c in self.exclude:
continue
tmpfil = self.data[c][res][rep]
with open(tmpfil, 'rb') as source:
tmpcache = pickle.load(source)
tmpcache.hmm = model
# same hmm for all chromosomes
with open(tmpfil, 'wb') as output:
pickle.dump(tmpcache, output)
[docs] def callHierDomain(self, cpu_core = 1):
"""
Identify hierarchical domains of each chromosome independently
and concurrently, find consistent domains between biological
replicates, and finally combine results of all chromosomes.
Parameters
----------
cpu_core : int
Number of processes to launch. For now, *hitad* only supports
parallel computing on a quite high layer, that is, it simply
allocates an uncompleted :py:class:`tadlib.hitad.chromLev.Chrom`
object to an idle processor and invokes its *callDomain* method.
"""
class SubProcess(mp.Process):
def __init__(self, task_queue):
mp.Process.__init__(self)
self.task_queue = task_queue
def run(self):
proc_name = self.name
while True:
current = self.task_queue.get()
if current is None:
log.debug('{0}: completed'.format(proc_name))
self.task_queue.task_done()
break
chrom, res, rep = current
log.debug('{0}: Chrom {1} (res {2}, {3})'.format(proc_name, chrom, res, rep))
worker(chrom, res, rep)
self.task_queue.task_done()
def worker(chrom, res, rep):
tmpfil = self.data[chrom][res][rep]
with open(tmpfil, 'rb') as source:
curChrom = pickle.load(source)
curChrom.callDomain()
# Update cache data
with open(tmpfil, 'wb') as output:
pickle.dump(curChrom, output)
cpu_core = min(cpu_core, total_cpu)
log.debug('Spawn {0} subprocesses ...'.format(cpu_core))
tasks = mp.JoinableQueue()
procs = [SubProcess(tasks) for i in range(cpu_core)]
for p in procs:
p.start()
for chrom in self.data:
for res in self.data[chrom]:
for rep in self.data[chrom][res]:
tasks.put((chrom, res, rep))
for p in procs:
tasks.put(None)
tasks.join()
log.debug('Extract reproducible domains from replicates ...')
self.Results = []
self.DIs = {}
for chrom in self.data:
log.debug('Chrom {0} ...'.format(chrom))
pool = {}
for res in self.data[chrom]:
reps = {}
for rep in self.data[chrom][res]:
tmpfil = self.data[chrom][res][rep]
with open(tmpfil, 'rb') as source:
reps[rep] = pickle.load(source)
mrep = MultiReps(chrom, res, reps)
mrep.callDomain()
pool[res] = mrep
minres = min(pool)
self.Results.extend(pool[minres].mergedDomains)
log.debug('Add the DI column into cools ...')
for res in self.cools:
for rep in self.cools[res]:
lib = cooler.Cooler(self.cools[res][rep])
DIs = np.r_[[]]
for c in lib.chromnames:
if not c in self.exclude:
tmpfil = self.data[c][res][rep]
with open(tmpfil, 'rb') as source:
tmpcache = pickle.load(source)
DIs = np.r_[DIs, tmpcache.DIs]
else:
DIs = np.r_[DIs, np.zeros(len(lib.bins().fetch(c)))]
with lib.open('r+') as grp:
if self.DIcol in grp['bins']:
del grp['bins'][self.DIcol]
h5opts = dict(compression='gzip', compression_opts=6)
grp['bins'].create_dataset(self.DIcol, data=DIs, **h5opts)
def outputDomain(self, filename):
with open(filename, 'w') as output:
for d in self.Results:
line = '{0}\t{1}\t{2}\t{3}\n'.format(*tuple(d))
output.write(line)
[docs] def wipeDisk(self):
"""
Remove catched (pickled) :py:class:`tadlib.hitad.chromLev.Chrom`
objects before exiting.
"""
for chrom in self.data:
for res in self.data[chrom]:
for rep in self.data[chrom][res]:
os.remove(self.data[chrom][res][rep])