Skip to content

Commit

Permalink
Speed up bedGraph file writing by using C functions.
Browse files Browse the repository at this point in the history
  • Loading branch information
taoliu committed Mar 12, 2015
1 parent 782bf5c commit f860986
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 112 deletions.
199 changes: 125 additions & 74 deletions MACS2/IO/CallPeakUnit.pyx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Time-stamp: <2015-03-10 13:33:32 Tao Liu>
# Time-stamp: <2015-03-11 23:46:16 Tao Liu>

"""Module for Calculate Scores.
Expand Down Expand Up @@ -65,6 +65,15 @@ import logging

from time import time as ttime

from libc.stdio cimport *

cdef extern from "stdio.h":
ctypedef struct FILE
FILE *fopen (const char *filename, const char *opentype)
#FILE * fopen ( const char * filename, const char * mode )
int fclose (FILE *stream)
int fprintf (FILE *stream, const char *template, ...)

# ------------------------------------
# constants
# ------------------------------------
Expand Down Expand Up @@ -291,10 +300,12 @@ cdef class CallerFromAlignments:
# temporary data buffer
str chrom # name of current chromosome
list chr_pos_treat_ctrl # temporary [position, treat_pileup, ctrl_pileup] for a given chromosome
str bedGraph_treat_filename
str bedGraph_control_filename
object bedGraph_treat # file handler to write ChIP pileup
object bedGraph_ctrl # file handler to write Control pileup
char * bedGraph_treat_filename
char * bedGraph_control_filename
FILE * bedGraph_treat_f
FILE * bedGraph_ctrl_f
#object bedGraph_treat # file handler to write ChIP pileup
#object bedGraph_ctrl # file handler to write Control pileup
# data needed to be pre-computed before peak calling
object pqtable # remember pvalue->qvalue convertion
bool pvalue_all_done # whether the pvalue of whole genome is all calculated. If yes, it's OK to calculate q-value.
Expand Down Expand Up @@ -353,6 +364,9 @@ cdef class CallerFromAlignments:
cdef:
set chr1, chr2
int i
char * tmp
bytes tmp_bytes


if isinstance(treat, FWTrack):
self.PE_mode = False
Expand All @@ -377,8 +391,12 @@ cdef class CallerFromAlignments:
self.save_bedGraph = save_bedGraph
self.save_SPMR = save_SPMR
self.bedGraph_filename_prefix = bedGraph_filename_prefix
self.bedGraph_treat_filename = bedGraph_treat_filename
self.bedGraph_control_filename = bedGraph_control_filename
#tmp_bytes = bedGraph_treat_filename.encode('UTF-8')
#print bedGraph_treat_filename, tmp_bytes
self.bedGraph_treat_filename = <bytes>bedGraph_treat_filename
#tmp_bytes = bedGraph_control_filename.encode('UTF-8')
#print bedGraph_control_filename, tmp_bytes
self.bedGraph_control_filename = <bytes>bedGraph_control_filename

if not self.ctrl_d_s or not self.ctrl_scaling_factor_s:
self.no_lambda_flag = True
Expand Down Expand Up @@ -462,17 +480,12 @@ cdef class CallerFromAlignments:
self.chr_pos_treat_ctrl[1].resize(0,refcheck=False)
self.chr_pos_treat_ctrl[2].resize(0,refcheck=False)

# generate [array(BYTE4,[]),array(FBYTE4,[])] of pileup for a chromosome

if self.PE_mode:
treat_pv = self.treat.pileup_a_chromosome ( chrom, [self.treat_scaling_factor,], baseline_value = 0.0 )
else:
treat_pv = self.treat.pileup_a_chromosome( chrom, [self.d,], [self.treat_scaling_factor,], baseline_value = 0.0,
directional = True,
end_shift = self.end_shift )

#print "pileup treatment took:",ttime()-t0
#t0 = ttime()

if not self.no_lambda_flag:
if self.PE_mode:
Expand All @@ -488,10 +501,6 @@ cdef class CallerFromAlignments:

self.chr_pos_treat_ctrl = self.__chrom_pair_treat_ctrl( treat_pv, ctrl_pv)

#print "pileup control took:",ttime()-t0

#self.test_time += ttime() - t

# clean treat_pv and ctrl_pv
treat_pv = []
ctrl_pv = []
Expand Down Expand Up @@ -862,6 +871,7 @@ cdef class CallerFromAlignments:
cdef:
str chrom
str s
bytes tmp_bytes

peaks = PeakIO()

Expand All @@ -877,9 +887,8 @@ cdef class CallerFromAlignments:

# prepare bedGraph file
if self.save_bedGraph:

self.bedGraph_treat = open( self.bedGraph_treat_filename, "w" )
self.bedGraph_ctrl = open( self.bedGraph_control_filename, "w" )
self.bedGraph_treat_f = fopen( self.bedGraph_treat_filename, "w" )
self.bedGraph_ctrl_f = fopen( self.bedGraph_control_filename, "w" )

logging.info ("#3 In the peak calling step, the following will be performed simultaneously:")
logging.info ("#3 Write bedGraph files for treatment pileup (after scaling if necessary)... %s" % self.bedGraph_filename_prefix + "_treat_pileup.bdg")
Expand All @@ -894,8 +903,10 @@ cdef class CallerFromAlignments:

if self.trackline:
# this line is REQUIRED by the wiggle format for UCSC browser
self.bedGraph_treat.write( "track type=bedGraph name=\"treatment pileup\" description=\"treatment pileup after possible scaling for \'%s\'\"\n" % self.bedGraph_filename_prefix )
self.bedGraph_ctrl.write ( "track type=bedGraph name=\"control lambda\" description=\"control lambda after possible scaling for \'%s\'\"\n" % self.bedGraph_filename_prefix )
tmp_bytes = ("track type=bedGraph name=\"treatment pileup\" description=\"treatment pileup after possible scaling for \'%s\'\"\n" % self.bedGraph_filename_prefix).encode()
fprintf( self.bedGraph_treat_f, tmp_bytes )
tmp_bytes = ("track type=bedGraph name=\"control lambda\" description=\"control lambda after possible scaling for \'%s\'\"\n" % self.bedGraph_filename_prefix).encode()
fprintf( self.bedGraph_ctrl_f, tmp_bytes )

logging.info("#3 Call peaks for each chromosome...")
for chrom in self.chromosomes:
Expand All @@ -905,8 +916,8 @@ cdef class CallerFromAlignments:

# close bedGraph file
if self.save_bedGraph:
self.bedGraph_treat.close()
self.bedGraph_ctrl.close()
fclose(self.bedGraph_treat_f)
fclose(self.bedGraph_ctrl_f)
self.save_bedGraph = False

#print "time to build peak regions: %f" % self.test_time
Expand Down Expand Up @@ -999,9 +1010,6 @@ cdef class CallerFromAlignments:
#t0 = ttime()

# first bit of region above cutoff
#acs_next = iter(above_cutoff_startpos).next
#ace_next = iter(above_cutoff_endpos).next
#acia_next = iter(above_cutoff_index_array).next
acs_ptr = <int32_t *>above_cutoff_startpos.data
ace_ptr = <int32_t *>above_cutoff_endpos.data
acia_ptr= <int32_t *>above_cutoff_index_array.data
Expand All @@ -1027,8 +1035,8 @@ cdef class CallerFromAlignments:
acs_ptr += 1
ace_ptr += 1
acia_ptr+= 1
tp = treat_array[ ti ]
cp = ctrl_array[ ti ]
tp = treat_array_ptr[ ti ]
cp = ctrl_array_ptr[ ti ]
tl = ts - lastp
if tl <= max_gap:
# append.
Expand Down Expand Up @@ -1341,13 +1349,23 @@ cdef class CallerFromAlignments:
"""
cdef:
np.ndarray pos_array, treat_array, ctrl_array
np.ndarray[np.int32_t, ndim=1] pos_array
np.ndarray[np.float32_t, ndim=1] treat_array, ctrl_array
int32_t * pos_array_ptr
float32_t * treat_array_ptr
float32_t * ctrl_array_ptr
int l, i
int p, pre_p_t, pre_p_c # current position, previous position for treat, previous position for control
float pre_v_t, pre_v_c, v_t, v_c # previous value for treat, for control, current value for treat, for control
float denominator # 1 if save_SPMR is false, or depth in million if save_SPMR is true. Note, while piling up and calling peaks, treatment and control have been scaled to the same depth, so we need to find what this 'depth' is.
FILE * ft
FILE * fc
bytes tmp_bytes

[pos_array, treat_array, ctrl_array] = self.chr_pos_treat_ctrl
pos_array_ptr = <int32_t *> pos_array.data
treat_array_ptr = <float32_t *> treat_array.data
ctrl_array_ptr = <float32_t *> ctrl_array.data

if self.save_SPMR:
if self.treat_scaling_factor == 1:
Expand All @@ -1364,34 +1382,48 @@ cdef class CallerFromAlignments:
if l == 0: # if there is no data, return
return False

# for treat
t_write_func = self.bedGraph_treat.write
c_write_func = self.bedGraph_ctrl.write
ft = self.bedGraph_treat_f
fc = self.bedGraph_ctrl_f
#t_write_func = self.bedGraph_treat.write
#c_write_func = self.bedGraph_ctrl.write

pre_p_t = 0
pre_p_c = 0
pre_v_t = treat_array[ 0 ]/denominator
pre_v_c = ctrl_array [ 0 ]/denominator
pre_v_t = treat_array_ptr[ 0 ]/denominator
pre_v_c = ctrl_array_ptr [ 0 ]/denominator
treat_array_ptr += 1
ctrl_array_ptr += 1

for i in range( 1, l ):
v_t = treat_array[ i ]/denominator
v_c = ctrl_array [ i ]/denominator
p = pos_array [ i-1 ]
v_t = treat_array_ptr[ 0 ]/denominator
v_c = ctrl_array_ptr [ 0 ]/denominator
p = pos_array_ptr [ 0 ]
pos_array_ptr += 1
treat_array_ptr += 1
ctrl_array_ptr += 1

if abs(pre_v_t - v_t) > 1e-5: # precision is 5 digits
t_write_func( "%s\t%d\t%d\t%.5f\n" % ( chrom, pre_p_t, p, pre_v_t ) )
tmp_bytes = ("%s\t%d\t%d\t%.5f\n" % ( chrom, pre_p_t, p, pre_v_t )).encode()
#t_write_func( "%s\t%d\t%d\t%.5f\n" % ( chrom, pre_p_t, p, pre_v_t ) )
fprintf( ft, tmp_bytes )
pre_v_t = v_t
pre_p_t = p

if abs(pre_v_c - v_c) > 1e-5: # precision is 5 digits
c_write_func( "%s\t%d\t%d\t%.5f\n" % ( chrom, pre_p_c, p, pre_v_c ) )
tmp_bytes = ("%s\t%d\t%d\t%.5f\n" % ( chrom, pre_p_c, p, pre_v_c )).encode()
fprintf( fc, tmp_bytes )
#c_write_func( "%s\t%d\t%d\t%.5f\n" % ( chrom, pre_p_c, p, pre_v_c ) )
pre_v_c = v_c
pre_p_c = p

p = pos_array[ -1 ]
p = pos_array_ptr[ 0 ]
# last one
t_write_func( "%s\t%d\t%d\t%.5f\n" % ( chrom, pre_p_t, p, pre_v_t ) )
c_write_func( "%s\t%d\t%d\t%.5f\n" % ( chrom, pre_p_c, p, pre_v_c ) )
tmp_bytes = ("%s\t%d\t%d\t%.5f\n" % ( chrom, pre_p_t, p, pre_v_t )).encode()
fprintf( ft, tmp_bytes )
tmp_bytes = ("%s\t%d\t%d\t%.5f\n" % ( chrom, pre_p_c, p, pre_v_c )).encode()
fprintf( fc, tmp_bytes )
#t_write_func( "%s\t%d\t%d\t%.5f\n" % ( chrom, pre_p_t, p, pre_v_t ) )
#c_write_func( "%s\t%d\t%d\t%.5f\n" % ( chrom, pre_p_c, p, pre_v_c ) )

return True

Expand Down Expand Up @@ -1436,25 +1468,29 @@ cdef class CallerFromAlignments:
# prepare bedGraph file
if self.save_bedGraph:

self.bedGraph_treat = open( self.bedGraph_treat_filename, "w" )
self.bedGraph_ctrl = open( self.bedGraph_control_filename, "w" )
self.bedGraph_treat_f = fopen( self.bedGraph_treat_filename, "w" )
self.bedGraph_ctrl_f = fopen( self.bedGraph_control_filename, "w" )
logging.info ("#3 In the peak calling step, the following will be performed simultaneously:")
logging.info ("#3 Write bedGraph files for treatment pileup (after scaling if necessary)... %s" % self.bedGraph_filename_prefix + "_treat_pileup.bdg")
logging.info ("#3 Write bedGraph files for control lambda (after scaling if necessary)... %s" % self.bedGraph_filename_prefix + "_control_lambda.bdg")

if self.trackline:
# this line is REQUIRED by the wiggle format for UCSC browser
self.bedGraph_treat.write( "track type=bedGraph name=\"treatment pileup\" description=\"treatment pileup after possible scaling for \'%s\'\"\n" % self.bedGraph_filename_prefix )
self.bedGraph_ctrl.write ( "track type=bedGraph name=\"control lambda\" description=\"control lambda after possible scaling for \'%s\'\"\n" % self.bedGraph_filename_prefix )
tmp_bytes = ("track type=bedGraph name=\"treatment pileup\" description=\"treatment pileup after possible scaling for \'%s\'\"\n" % self.bedGraph_filename_prefix).encode()
fprintf( self.bedGraph_treat_f, tmp_bytes )
tmp_bytes = ("track type=bedGraph name=\"control lambda\" description=\"control lambda after possible scaling for \'%s\'\"\n" % self.bedGraph_filename_prefix).encode()
fprintf( self.bedGraph_ctrl_f, tmp_bytes )


logging.info("#3 Call peaks for each chromosome...")
for chrom in self.chromosomes:
self.__chrom_call_broadpeak_using_certain_criteria ( lvl1peaks, lvl2peaks, chrom, scoring_function_symbols, lvl1_cutoff_s, lvl2_cutoff_s, min_length, lvl1_max_gap, lvl2_max_gap, self.save_bedGraph )

# close bedGraph file
if self.save_bedGraph:
self.bedGraph_treat.close()
self.bedGraph_ctrl.close()
fclose( self.bedGraph_treat_f )
fclose( self.bedGraph_ctrl_f )
#self.bedGraph_ctrl.close()
self.save_bedGraph = False

# now combine lvl1 and lvl2 peaks
Expand Down Expand Up @@ -1517,6 +1553,11 @@ cdef class CallerFromAlignments:
np.ndarray above_cutoff_index_array
list score_array_s # list to keep different types of scores
list peak_content
int32_t * acs_ptr
int32_t * ace_ptr
int32_t * acia_ptr
float32_t * treat_array_ptr
float32_t * ctrl_array_ptr

assert len(scoring_function_s) == len(lvl1_cutoff_s), "number of functions and cutoffs should be the same!"
assert len(scoring_function_s) == len(lvl2_cutoff_s), "number of functions and cutoffs should be the same!"
Expand Down Expand Up @@ -1560,26 +1601,31 @@ cdef class CallerFromAlignments:
above_cutoff_startpos[0] = 0

# first bit of region above cutoff
acs_next = iter(above_cutoff_startpos).next
ace_next = iter(above_cutoff_endpos).next
acia_next = iter(above_cutoff_index_array).next
acs_ptr = <int32_t *>above_cutoff_startpos.data
ace_ptr = <int32_t *>above_cutoff_endpos.data
acia_ptr= <int32_t *>above_cutoff_index_array.data
treat_array_ptr = <float32_t *> treat_array.data
ctrl_array_ptr = <float32_t *> ctrl_array.data

ts = acs_next()
te = ace_next()
ti = acia_next()
tp = treat_array[ ti ]
cp = ctrl_array[ ti ]
ts = acs_ptr[ 0 ]
te = ace_ptr[ 0 ]
ti = acia_ptr[ 0 ]
tp = treat_array_ptr[ ti ]
cp = ctrl_array_ptr[ ti ]

peak_content.append( ( ts, te, tp, cp, ti ) )
lastp = te

#peak_content.append( (above_cutoff_startpos[0], above_cutoff_endpos[0], treat_array[above_cutoff_index_array[0]], ctrl_array[above_cutoff_index_array[0]], score_array_s, above_cutoff_index_array[0] ) )
for i in range( 1, above_cutoff_startpos.size ):
ts = acs_next()
te = ace_next()
ti = acia_next()
tp = treat_array[ ti ]
cp = ctrl_array[ ti ]
ts = acs_ptr[ 0 ]
te = ace_ptr[ 0 ]
ti = acia_ptr[ 0 ]
acs_ptr += 1
ace_ptr += 1
acia_ptr+= 1
tp = treat_array_ptr[ ti ]
cp = ctrl_array_ptr[ ti ]
tl = ts - lastp
if tl <= lvl1_max_gap:
# append
Expand Down Expand Up @@ -1615,24 +1661,29 @@ cdef class CallerFromAlignments:
above_cutoff_startpos[0] = 0

# first bit of region above cutoff
acs_next = iter(above_cutoff_startpos).next
ace_next = iter(above_cutoff_endpos).next
acia_next = iter(above_cutoff_index_array).next
acs_ptr = <int32_t *>above_cutoff_startpos.data
ace_ptr = <int32_t *>above_cutoff_endpos.data
acia_ptr= <int32_t *>above_cutoff_index_array.data
treat_array_ptr = <float32_t *> treat_array.data
ctrl_array_ptr = <float32_t *> ctrl_array.data

ts = acs_next()
te = ace_next()
ti = acia_next()
tp = treat_array[ ti ]
cp = ctrl_array[ ti ]
ts = acs_ptr[ 0 ]
te = ace_ptr[ 0 ]
ti = acia_ptr[ 0 ]
tp = treat_array_ptr[ ti ]
cp = ctrl_array_ptr[ ti ]

peak_content.append( ( ts, te, tp, cp, ti ) )
lastp = te
for i in range( 1, above_cutoff_startpos.size ):
ts = acs_next()
te = ace_next()
ti = acia_next()
tp = treat_array[ ti ]
cp = ctrl_array[ ti ]
ts = acs_ptr[ 0 ]
te = ace_ptr[ 0 ]
ti = acia_ptr[ 0 ]
acs_ptr += 1
ace_ptr += 1
acia_ptr+= 1
tp = treat_array_ptr[ ti ]
cp = ctrl_array_ptr[ ti ]
tl = ts - lastp
if tl <= lvl2_max_gap:
# append
Expand Down
Loading

0 comments on commit f860986

Please sign in to comment.