1#!/usr/bin/env python
2# Time-stamp: <2019-09-25 14:44:07 taoliu>
3
4import io
5import unittest
6from numpy.testing import assert_equal,  assert_almost_equal, assert_array_equal
7
8from MACS2.IO.ScoreTrack import *
9from MACS2.IO.BedGraph import bedGraphTrackI
10
11class Test_TwoConditionScores(unittest.TestCase):
12    def setUp(self):
13        self.t1bdg = bedGraphTrackI()
14        self.t2bdg = bedGraphTrackI()
15        self.c1bdg = bedGraphTrackI()
16        self.c2bdg = bedGraphTrackI()
17        self.test_regions1 = [(b"chrY",0,70,0.00,0.01),
18                              (b"chrY",70,80,7.00,0.5),
19                              (b"chrY",80,150,0.00,0.02)]
20        self.test_regions2 = [(b"chrY",0,75,20.0,4.00),
21                              (b"chrY",75,90,35.0,6.00),
22                              (b"chrY",90,150,10.0,15.00)]
23        for a in self.test_regions1:
24            self.t1bdg.safe_add_loc(a[0],a[1],a[2],a[3])
25            self.c1bdg.safe_add_loc(a[0],a[1],a[2],a[4])
26
27        for a in self.test_regions2:
28            self.t2bdg.safe_add_loc(a[0],a[1],a[2],a[3])
29            self.c2bdg.safe_add_loc(a[0],a[1],a[2],a[4])
30
31        self.twoconditionscore = TwoConditionScores( self.t1bdg,
32                                                     self.c1bdg,
33                                                     self.t2bdg,
34                                                     self.c2bdg,
35                                                     1.0,
36                                                     1.0 )
37        self.twoconditionscore.build()
38        self.twoconditionscore.finalize()
39        (self.cat1,self.cat2,self.cat3) = self.twoconditionscore.call_peaks(min_length=10, max_gap=10, cutoff=3)
40
41class Test_ScoreTrackII(unittest.TestCase):
42
43    def setUp(self):
44        # for initiate scoretrack
45        self.test_regions1 = [(b"chrY",10,100,10),
46                              (b"chrY",60,10,10),
47                              (b"chrY",110,15,20),
48                              (b"chrY",160,5,20),
49                              (b"chrY",210,20,5)]
50        self.treat_edm = 10
51        self.ctrl_edm = 5
52        # for different scoring method
53        self.p_result = [60.49, 0.38, 0.08, 0.0, 6.41] # -log10(p-value), pseudo count 1 added
54        self.q_result = [58.17, 0.0, 0.0, 0.0, 5.13] # -log10(q-value) from BH, pseudo count 1 added
55        self.l_result = [58.17, 0.0, -0.28, -3.25, 4.91] # log10 likelihood ratio, pseudo count 1 added
56        self.f_result = [0.96, 0.00, -0.12, -0.54, 0.54] # note, pseudo count 1 would be introduced.
57        self.d_result = [90.00, 0, -5.00, -15.00, 15.00]
58        self.m_result = [10.00, 1.00, 1.50, 0.50, 2.00]
59        # for norm
60        self.norm_T = np.array([[ 10, 100,  20,   0],
61                                [ 60,  10,  20,   0],
62                                [110,  15,  40,   0],
63                                [160,   5,  40,   0],
64                                [210,  20,  10,   0]]).transpose()
65        self.norm_C = np.array([[ 10,  50,  10,   0],
66                                [ 60,   5,  10,   0],
67                                [110,   7.5,  20,   0],
68                                [160,   2.5,  20,   0],
69                                [210,  10,   5,   0]]).transpose()
70        self.norm_M = np.array([[ 10,  10,   2,   0],
71                                [ 60,   1,   2,   0],
72                                [110,   1.5,   4,   0],
73                                [160,   0.5,   4,   0],
74                                [210,   2,   1,   0]]).transpose()
75        self.norm_N = np.array([[ 10, 100,  10,   0],  # note precision lost
76                                [ 60,  10,  10,   0],
77                                [110,  15,  20,   0],
78                                [160,   5,  20,   0],
79                                [210,  20,   5,   0]]).transpose()
80
81        # for write_bedGraph
82        self.bdg1 = """chrY	0	10	100.00000
83chrY	10	60	10.00000
84chrY	60	110	15.00000
85chrY	110	160	5.00000
86chrY	160	210	20.00000
87"""
88        self.bdg2 = """chrY	0	60	10.00000
89chrY	60	160	20.00000
90chrY	160	210	5.00000
91"""
92        self.bdg3 = """chrY	0	10	60.48912
93chrY	10	60	0.37599
94chrY	60	110	0.07723
95chrY	110	160	0.00006
96chrY	160	210	6.40804
97"""
98        # for peak calls
99        self.peak1 = """chrY	0	60	peak_1	60.4891
100chrY	160	210	peak_2	6.40804
101"""
102        self.summit1 = """chrY	5	6	peak_1	60.4891
103chrY	185	186	peak_2	6.40804
104"""
105        self.xls1    ="""chr	start	end	length	abs_summit	pileup	-log10(pvalue)	fold_enrichment	-log10(qvalue)	name
106chrY	1	60	60	6	100	63.2725	9.18182	-1	MACS_peak_1
107chrY	161	210	50	186	20	7.09102	3.5	-1	MACS_peak_2
108"""
109
110    def assertListAlmostEqual ( self, a, b, places =2 ):
111        return all( [self.assertAlmostEqual(x, y, places=places) for (x, y) in zip( a, b)] )
112
113    def test_compute_scores(self):
114        s1 = scoreTrackII( self.treat_edm, self.ctrl_edm )
115        s1.add_chromosome( b"chrY", 5 )
116        for a in self.test_regions1:
117            s1.add( a[0],a[1],a[2],a[3] )
118
119        s1.set_pseudocount ( 1.0 )
120
121        s1.change_score_method( ord('p') )
122        r = s1.get_data_by_chr(b"chrY")
123        self.assertListAlmostEqual( [round(x,2) for x in r[3]], self.p_result )
124
125        s1.change_score_method( ord('q') )
126        r = s1.get_data_by_chr(b"chrY")
127        self.assertListAlmostEqual( [round(x,2) for x in list(r[3])], self.q_result )
128
129        s1.change_score_method( ord('l') )
130        r = s1.get_data_by_chr(b"chrY")
131        self.assertListAlmostEqual( [round(x,2) for x in list(r[3])], self.l_result )
132
133        s1.change_score_method( ord('f') )
134        r = s1.get_data_by_chr(b"chrY")
135        self.assertListAlmostEqual( [round(x,2) for x in list(r[3])], self.f_result )
136
137        s1.change_score_method( ord('d') )
138        r = s1.get_data_by_chr(b"chrY")
139        self.assertListAlmostEqual( [round(x,2) for x in list(r[3])], self.d_result )
140
141        s1.change_score_method( ord('m') )
142        r = s1.get_data_by_chr(b"chrY")
143        self.assertListAlmostEqual( [round(x,2) for x in list(r[3])], self.m_result )
144
145    def test_normalize(self):
146        s1 = scoreTrackII( self.treat_edm, self.ctrl_edm )
147        s1.add_chromosome( b"chrY", 5 )
148        for a in self.test_regions1:
149            s1.add( a[0],a[1],a[2],a[3] )
150
151        s1.change_normalization_method( ord('T') )
152        r = s1.get_data_by_chr(b"chrY")
153        assert_array_equal( r, self.norm_T )
154
155        s1.change_normalization_method( ord('C') )
156        r = s1.get_data_by_chr(b"chrY")
157        assert_array_equal( r, self.norm_C )
158
159        s1.change_normalization_method( ord('M') )
160        r = s1.get_data_by_chr(b"chrY")
161        assert_array_equal( r, self.norm_M )
162
163        s1.change_normalization_method( ord('N') )
164        r = s1.get_data_by_chr(b"chrY")
165        assert_array_equal( r, self.norm_N )
166
167    def test_writebedgraph ( self ):
168        s1 = scoreTrackII( self.treat_edm, self.ctrl_edm )
169        s1.add_chromosome( b"chrY", 5 )
170        for a in self.test_regions1:
171            s1.add( a[0],a[1],a[2],a[3] )
172
173        s1.change_score_method( ord('p') )
174
175        strio = io.StringIO()
176        s1.write_bedGraph( strio, "NAME", "DESC", 1 )
177        self.assertEqual( strio.getvalue(), self.bdg1 )
178        strio = io.StringIO()
179        s1.write_bedGraph( strio, "NAME", "DESC", 2 )
180        self.assertEqual( strio.getvalue(), self.bdg2 )
181        strio = io.StringIO()
182        s1.write_bedGraph( strio, "NAME", "DESC", 3 )
183        self.assertEqual( strio.getvalue(), self.bdg3 )
184
185    def test_callpeak ( self ):
186        s1 = scoreTrackII( self.treat_edm, self.ctrl_edm )
187        s1.add_chromosome( b"chrY", 5 )
188        for a in self.test_regions1:
189            s1.add( a[0],a[1],a[2],a[3] )
190
191        s1.change_score_method( ord('p') )
192        p = s1.call_peaks( cutoff = 0.10, min_length=10, max_gap=10 )
193        strio = io.StringIO()
194        p.write_to_bed( strio, trackline = False )
195        self.assertEqual( strio.getvalue(), self.peak1 )
196
197        strio = io.StringIO()
198        p.write_to_summit_bed( strio, trackline = False )
199        self.assertEqual( strio.getvalue(), self.summit1 )
200
201        strio = io.StringIO()
202        p.write_to_xls( strio )
203        self.assertEqual( strio.getvalue(), self.xls1 )
204
205