1#  _______________________________________________________________________
2#
3#  PECOS: Parallel Environment for Creation Of Stochastics
4#  Copyright (c) 2011, Sandia National Laboratories.
5#  This software is distributed under the GNU Lesser General Public License.
6#  For more information, see the README file in the top Pecos directory.
7#  _______________________________________________________________________
8
9import unittest, numpy, os
10from PyDakota.regression import *
11from PyDakota.options_list import OptionsList
12class TestLASSO(unittest.TestCase):
13    def setUp( self ):
14        self.diabetes_matrix, self.diabetes_rhs = self.load_diabetes_data()
15        self.pce_matrix, self.pce_rhs, self.pce_exact_coef = self.load_pce_data()
16
17    def load_diabetes_data( self ):
18        base_dir = os.path.join(os.path.dirname(__file__), 'data')
19        matrix = numpy.loadtxt(os.path.join(base_dir, 'diabetes_data.csv.gz'))
20        rhs = numpy.loadtxt(os.path.join(base_dir, 'diabetes_target.csv.gz'))
21        return matrix, rhs
22
23    def load_pce_data( self ):
24        base_dir = os.path.join(os.path.dirname(__file__), 'data')
25        matrix = numpy.loadtxt(os.path.join(base_dir, 'pce_data.csv.gz'))
26        rhs = numpy.loadtxt(os.path.join(base_dir, 'pce_target.csv.gz'))
27        exact_coef = numpy.loadtxt(os.path.join(base_dir, 'pce_coef.csv.gz'))
28        return matrix, rhs, exact_coef
29
30    def test_lar_last_step( self ):
31        """
32        Test that the last step of least angle regression returns the
33        least squares solution
34        """
35        solver = LARSolver()
36        matrix = self.diabetes_matrix
37        rhs = self.diabetes_rhs
38        for store_history in [False,True]:
39            regression_opts = {'regression_type':LEAST_ANGLE_REGRESSION,
40                               'verbosity':0,'store-history':store_history}
41            regression_opts = OptionsList(regression_opts)
42            solver.solve(matrix, rhs, regression_opts)
43            coef_lasso = solver.get_solutions_for_all_regularization_params(0)
44            coef_lstsq = numpy.linalg.lstsq(matrix, rhs)[0]
45            assert numpy.allclose(coef_lasso[:,-1].squeeze(), coef_lstsq.squeeze())
46
47    def test_store_history_consistent_with_no_history(self):
48        solver = LARSolver()
49        matrix = self.diabetes_matrix
50        rhs = self.diabetes_rhs
51
52        coef_lasso = []; residuals=[]
53        for store_history in [False,True]:
54            regression_opts = {'regression_type':LEAST_ANGLE_REGRESSION,
55                               'verbosity':0,'store-history':store_history}
56            solver.solve(matrix, rhs, regression_opts)
57            coef_lasso.append(solver.get_solutions_for_all_regularization_params(0))
58            residuals.append(solver.get_residuals_for_all_regularization_params(0))
59        assert numpy.allclose(coef_lasso[0][:,0],coef_lasso[1][:,-1],atol=1e-15)
60        assert numpy.allclose(residuals[0][0],residuals[1][-1],atol=1e-12)
61
62
63    def test_lar_factory( self ):
64        """
65        Test that the regression factory returns a lar solver and that the
66        solver works correctly. That is the last step of least angle
67        regression returns the least squares solution
68        """
69        factory_opts = OptionsList({'regression_type':LEAST_ANGLE_REGRESSION})
70        print type(factory_opts)
71        solver = regression_solver_factory(factory_opts)
72        matrix = self.diabetes_matrix
73        rhs = self.diabetes_rhs
74        for store_history in [False,True]:
75            regression_opts = {'regression_type':LEAST_ANGLE_REGRESSION,
76                               'verbosity':0,'store-history':store_history}
77            regression_opts = OptionsList(regression_opts)
78            solver.solve(matrix, rhs, regression_opts)
79            coef_lasso = solver.get_solutions_for_all_regularization_params(0)
80            coef_lstsq = numpy.linalg.lstsq(matrix, rhs)[0]
81            assert numpy.allclose(coef_lasso[:,-1].squeeze(), coef_lstsq.squeeze())
82
83    def test_lar_memory_management(self):
84        """
85        LAR internally allocates memory for solutions and QR factorization in
86        blocks so that if residual tolerance is met before maximum number of
87        solutions is reached min (num_rows, num_cols) then memory has not been
88        wasted. Memory is allocated in chunks of size 100. Test algorithm works
89        when more than one chunk is needed.
90        """
91        num_rows = 200
92        num_cols = 200
93        sparsity = 100
94        memory_chunk_size = 20
95        solver = LARSolver()
96        matrix = numpy.random.normal(0.,1.,(num_rows,num_cols))
97        x = numpy.zeros((num_cols),float)
98        I = numpy.random.permutation(num_cols)[:sparsity]
99        x[I] = numpy.random.normal(0.,1.,(sparsity))
100        rhs = numpy.dot(matrix, x)
101        regression_opts = {'regression_type':LEAST_ANGLE_REGRESSION,
102                           'verbosity':0,'memory-chunk-size':memory_chunk_size}
103        regression_opts = OptionsList(regression_opts)
104        solver.solve(matrix, rhs, regression_opts)
105        coef_lasso = solver.get_final_solutions()
106        assert numpy.allclose(coef_lasso[:,0].squeeze(), x.squeeze())
107
108
109    def test_lasso_last_step( self ):
110        """
111        Test that the last step of the lasso variant of
112        least angle regression returns the least squares solution
113        """
114        solver = LARSolver()
115        matrix = self.diabetes_matrix # use unnormalized data
116        rhs = self.diabetes_rhs
117        regression_opts = {'regression_type':LASSO_REGRESSION,'verbosity':0,
118                           'normalize-inputs':False}
119        regression_opts = OptionsList(regression_opts)
120        solver.solve(matrix, rhs, regression_opts)
121        coef_lasso = solver.get_solutions_for_all_regularization_params(0)
122        coef_lstsq = numpy.linalg.lstsq(matrix, rhs)[0]
123        #print coef_lasso[:,-1].squeeze(), coef_lstsq.squeeze()
124        assert numpy.allclose(coef_lasso[:,-1].squeeze(), coef_lstsq.squeeze())
125
126    def test_lar_path( self ):
127        """
128        Test max covariance is tied and decreasing
129        """
130        solver = LARSolver()
131        matrix = self.diabetes_matrix.copy()
132        rhs = self.diabetes_rhs
133        regression_opts = {'regression_type':LASSO_REGRESSION}
134        regression_opts = OptionsList(regression_opts)
135        solver.solve(matrix, rhs, regression_opts)
136        coef_lasso = solver.get_solutions_for_all_regularization_params(0)
137        max_covariance_prev = numpy.finfo(float).max
138        for i in xrange( coef_lasso.shape[1] ):
139            coef = coef_lasso[:,i]
140            residual = rhs - numpy.dot(self.diabetes_matrix, coef)
141            covariance = numpy.dot(self.diabetes_matrix.T, residual)
142
143            max_covariance = numpy.max(numpy.absolute(covariance))
144            assert max_covariance < max_covariance_prev
145            max_covariance_prev = max_covariance
146            eps = 1e-3
147            num_non_zeros = len(covariance[max_covariance-eps<
148                                           numpy.absolute(covariance)])
149            if i < self.diabetes_matrix.shape[1]-1:
150                assert num_non_zeros == i+2
151            else:
152                # no more than max_pred variables can go into the active set
153                assert num_non_zeros == self.diabetes_matrix.shape[1]
154
155    def test_lasso_path( self ):
156        """
157        Test max covariance is tied and decreasing
158        """
159
160        solver = LARSolver()
161        matrix = self.diabetes_matrix
162        rhs = self.diabetes_rhs
163        regression_opts = {'regression_type':LASSO_REGRESSION}
164        regression_opts = OptionsList(regression_opts)
165        solver.solve(matrix, rhs, regression_opts)
166        coef_lasso = solver.get_solutions_for_all_regularization_params(0)
167        max_covariance_prev = numpy.finfo(float).max
168        for i in xrange( coef_lasso.shape[1] ):
169            coef = coef_lasso[:,i]
170            residual = rhs - numpy.dot(self.diabetes_matrix, coef)
171            covariance = numpy.dot(self.diabetes_matrix.T, residual)
172
173            max_covariance = numpy.max(numpy.absolute(covariance))
174            assert max_covariance < max_covariance_prev
175            max_covariance_prev = max_covariance
176            eps = 1e-3
177            num_non_zeros = len(covariance[max_covariance-eps<
178                                           numpy.absolute(covariance)])
179            if i < self.diabetes_matrix.shape[1]-1:
180                assert num_non_zeros == i+2
181            else:
182                # no more than max_pred variables can go into the active set
183                assert num_non_zeros == self.diabetes_matrix.shape[1]
184
185    def test_non_negative_lasso_path( self ):
186        """
187        Test when enforcing non-negative coefficients the
188        max covariance is tied and decreasing
189        """
190        solver = LARSolver()
191        assert False, 'test of non-negative lasso not implemented'
192
193    def test_lasso_early_exit_tol( self ):
194        """
195        Test that the algorithm terminates correctly when tolerance is set
196        """
197        tol = 3.40e3
198        solver = LARSolver()
199        matrix = self.diabetes_matrix
200        rhs = self.diabetes_rhs
201        regression_opts = {'regression_type':LASSO_REGRESSION,
202                           'residual-tolerance':tol}
203        regression_opts = OptionsList(regression_opts)
204        solver.solve(matrix, rhs, regression_opts)
205        coef_lasso = solver.get_solutions_for_all_regularization_params(0)
206        coef = coef_lasso[:,-1]
207        residual = rhs - numpy.dot(self.diabetes_matrix, coef)
208        assert numpy.linalg.norm( residual ) < tol
209
210    def test_lasso_early_exit_num_non_zeros( self ):
211        """
212        Test that the algorithm terminates correctly when the number of nonzeros
213        is set
214        """
215        solver = LARSolver()
216        matrix = self.diabetes_matrix
217        rhs = self.diabetes_rhs
218        max_nnz = 9
219        regression_opts = {'regression_type':LASSO_REGRESSION,
220                           'verbosity':0,'max-num-non-zeros':max_nnz}
221        regression_opts = OptionsList(regression_opts)
222        solver.solve(matrix, rhs, regression_opts)
223        coef_lasso = solver.get_solutions_for_all_regularization_params(0)
224        assert numpy.count_nonzero( coef_lasso[:,-1] )==max_nnz
225
226    def test_lasso_pce_exact_recovery( self ):
227        base_dir = os.path.join(os.path.dirname(__file__), 'data')
228        matrix = numpy.loadtxt(os.path.join(base_dir, 'pce_data.csv.gz'))
229        rhs = numpy.loadtxt(os.path.join(base_dir, 'pce_target.csv.gz'))
230        exact_coef = numpy.loadtxt(os.path.join(base_dir, 'pce_coef.csv.gz'))
231        solver = LARSolver()
232        regression_opts = {'regression_type':LASSO_REGRESSION,
233                           'verbosity':0}
234        regression_opts = OptionsList(regression_opts)
235        solver.solve(matrix, rhs, regression_opts)
236        coef_lasso = solver.get_solutions_for_all_regularization_params(0)
237        assert numpy.allclose( coef_lasso[:,-1], exact_coef )
238
239
240class TestOMP(unittest.TestCase):
241    def setUp( self ):
242        self.diabetes_matrix, self.diabetes_rhs = self.load_diabetes_data()
243        self.pce_matrix, self.pce_rhs, self.pce_exact_coef = self.load_pce_data()
244
245    def load_diabetes_data( self ):
246        base_dir = os.path.join(os.path.dirname(__file__), 'data')
247        matrix = numpy.loadtxt(os.path.join(base_dir, 'diabetes_data.csv.gz'))
248        rhs = numpy.loadtxt(os.path.join(base_dir, 'diabetes_target.csv.gz'))
249        return matrix, rhs
250
251    def load_pce_data( self ):
252        base_dir = os.path.join(os.path.dirname(__file__), 'data')
253        matrix = numpy.loadtxt(os.path.join(base_dir, 'pce_data.csv.gz'))
254        rhs = numpy.loadtxt(os.path.join(base_dir, 'pce_target.csv.gz'))
255        exact_coef = numpy.loadtxt(os.path.join(base_dir, 'pce_coef.csv.gz'))
256        return matrix, rhs, exact_coef
257
258    def test_omp_last_step( self ):
259        """
260        Test that the last step of orthogonal matching pursuit returns the
261        least squares solution
262        """
263        solver = OMPSolver()
264        #solver.set_verbosity( 3 )
265        matrix = self.diabetes_matrix
266        rhs = self.diabetes_rhs
267        regression_opts = {'regression_type':ORTHOG_MATCH_PURSUIT,'verbosity':0}
268        regression_opts = OptionsList(regression_opts)
269        solver.solve(matrix, rhs, regression_opts)
270        coef_omp = solver.get_solutions_for_all_regularization_params(0)
271        coef_lstsq = numpy.linalg.lstsq(matrix, rhs)[0]
272        #print coef_omp[:,-1].squeeze(), coef_lstsq.squeeze()
273        assert numpy.allclose(coef_omp[:,-1].squeeze(), coef_lstsq.squeeze())
274
275    def test_omp_memory_management( self ):
276        """
277        OMP internally allocates memory for solutions and QR factorization in
278        blocks so that if residual tolerance is met before maximum number of
279        solutions is reached min (num_rows, num_cols) then memory has not been
280        wasted. Memory is allocated in chunks of size 100. Test algorithm works
281        when more than one chunk is needed.
282        """
283        num_rows = 200
284        num_cols = 200
285        sparsity = 100
286        memory_chunk_size = 100
287        solver = OMPSolver()
288        matrix = numpy.random.normal(0.,1.,(num_rows,num_cols))
289        x = numpy.zeros((num_cols),float)
290        I = numpy.random.permutation(num_cols)[:sparsity]
291        x[I] = numpy.random.normal(0.,1.,(sparsity))
292        rhs = numpy.dot(matrix, x)
293        regression_opts = {'regression_type':ORTHOG_MATCH_PURSUIT,'verbosity':0,
294                           'memory-chunk-size':memory_chunk_size}
295        regression_opts = OptionsList(regression_opts)
296        solver.solve(matrix, rhs, regression_opts)
297        coef_omp = solver.get_final_solutions()
298        assert numpy.allclose(coef_omp[:,0].squeeze(), x.squeeze())
299
300    def test_omp_path( self ):
301        """
302        Test residual is always decreasing
303        """
304        solver = OMPSolver()
305        #solver.set_verbosity( 3 )
306        matrix = self.diabetes_matrix.copy()
307        rhs = self.diabetes_rhs
308        regression_opts = {'regression_type':ORTHOG_MATCH_PURSUIT,'verbosity':0}
309        regression_opts = OptionsList(regression_opts)
310        solver.solve(matrix, rhs, regression_opts)
311        coef_omp = solver.get_solutions_for_all_regularization_params(0)
312        coef_lstsq = numpy.linalg.lstsq(matrix, rhs)[0]
313        residual_norm_prev = numpy.finfo(float).max
314        for i in xrange( coef_omp.shape[1] ):
315            coef = coef_omp[:,i]
316            residual = rhs - numpy.dot(self.diabetes_matrix, coef)
317            residual_norm = numpy.linalg.norm( residual )
318            assert residual_norm < residual_norm_prev
319
320    def test_omp_early_exit_tol( self ):
321        """
322        Test that the algorithm terminates correctly when tolerance is set
323        """
324        tol = 3.40e3
325        solver = OMPSolver()
326        matrix = self.diabetes_matrix
327        rhs = self.diabetes_rhs
328        regression_opts = {'regression_type':ORTHOG_MATCH_PURSUIT,'verbosity':0,
329                           'residual-tolerance':tol}
330        regression_opts = OptionsList(regression_opts)
331        solver.solve(matrix, rhs, regression_opts)
332        coef_omp = solver.get_solutions_for_all_regularization_params(0)
333        coef = coef_omp[:,-1]
334        residual = rhs - numpy.dot(self.diabetes_matrix, coef)
335        assert numpy.linalg.norm( residual ) < tol
336
337    def test_omp_early_exit_num_non_zeros( self ):
338        """
339        Test that the algorithm terminates correctly when the number of nonzeros
340        is set
341        """
342        solver = OMPSolver()
343        matrix = self.diabetes_matrix
344        rhs = self.diabetes_rhs
345        max_nnz = 9
346        # setting max_iters will set max_nnz as no columns of A can be removed
347        # once added
348        regression_opts = {'regression_type':ORTHOG_MATCH_PURSUIT,'verbosity':0,
349                           'max-iters':max_nnz}
350        regression_opts = OptionsList(regression_opts)
351        solver.solve(matrix, rhs, regression_opts)
352        coef_omp = solver.get_solutions_for_all_regularization_params(0)
353        assert numpy.count_nonzero( coef_omp[:,-1] )==max_nnz
354
355    def test_omp_pce_exact_recovery( self ):
356        base_dir = os.path.join(os.path.dirname(__file__), 'data')
357        matrix = numpy.loadtxt(os.path.join(base_dir, 'pce_data.csv.gz'))
358        rhs = numpy.loadtxt(os.path.join(base_dir, 'pce_target.csv.gz'))
359        exact_coef = numpy.loadtxt(os.path.join(base_dir, 'pce_coef.csv.gz'))
360        solver = OMPSolver()
361        regression_opts = {'regression_type':ORTHOG_MATCH_PURSUIT,'verbosity':0}
362        regression_opts = OptionsList(regression_opts)
363        solver.solve(matrix, rhs, regression_opts)
364        coef_omp = solver.get_solutions_for_all_regularization_params(0)
365        assert numpy.allclose( coef_omp[:,-1], exact_coef )
366
367def single_test_suite():
368    suite = unittest.TestSuite()
369    suite.addTest( TestLASSO( "test_lar_last_step" ) )
370
371    return suite
372
373if __name__ == '__main__':
374    unittest.main()
375    #runner = unittest.TextTestRunner()
376    #single = single_test_suite()
377    #runner.run( single )
378