1"""
2Copyright 2013 Steven Diamond, 2017 Robin Verschueren
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15"""
16
17from collections import namedtuple
18
19import numpy as np
20from scipy.sparse import dok_matrix
21
22import cvxpy.settings as s
23from cvxpy.constraints import SOC
24from cvxpy.reductions.solution import Solution, failure_solution
25from cvxpy.reductions.solvers import utilities
26from cvxpy.reductions.solvers.conic_solvers.conic_solver import (
27    ConicSolver, dims_to_solver_dict,)
28
29# Values used to distinguish between linear and quadratic constraints.
30_LIN, _QUAD = 0, 1
31# For internal bookkeeping, we have to separate linear indices from
32# quadratic indices. The "cpx_constrs" member of the solution will
33# contain namedtuples of (constr_type, index) where constr_type is either
34# _LIN or _QUAD.
35_CpxConstr = namedtuple("_CpxConstr", ["constr_type", "index"])
36
37
38def set_parameters(model, solver_opts) -> None:
39    """Sets CPLEX parameters."""
40    # TODO: Parameter support is functional, but perhaps not ideal.
41    # The user must pass parameter names as used in the CPLEX Python
42    # API, and raw values (i.e., no enum support).
43    kwargs = sorted(solver_opts.keys())
44    if "cplex_params" in kwargs:
45        for param, value in solver_opts["cplex_params"].items():
46            try:
47                eval("model.parameters.{0}.set({1})".format(param, value))
48            except AttributeError:
49                raise ValueError(
50                    "invalid CPLEX parameter, value pair ({0}, {1})".format(
51                        param, value))
52        kwargs.remove("cplex_params")
53    if "cplex_filename" in kwargs:
54        filename = solver_opts["cplex_filename"]
55        if filename:
56            model.write(filename)
57        kwargs.remove("cplex_filename")
58    if kwargs:
59        raise ValueError("invalid keyword-argument '{0}'".format(kwargs[0]))
60
61
62def hide_solver_output(model) -> None:
63    """Set CPLEX verbosity level (either on or off)."""
64    # By default the output will be sent to stdout. Setting the output
65    # streams to None will prevent any output from being shown.
66    model.set_results_stream(None)
67    model.set_warning_stream(None)
68    model.set_error_stream(None)
69    model.set_log_stream(None)
70
71
72def _handle_solve_status(model, solstat):
73    """Map CPLEX MIP solution status codes to non-MIP status codes."""
74    status = model.solution.status
75
76    # With CPLEX 12.10, some benders status codes were removed.
77    if model.get_versionnumber() < 12100000:
78        unbounded_status_codes = (
79            status.MIP_unbounded,
80            status.MIP_benders_master_unbounded,
81            status.benders_master_unbounded
82        )
83    else:
84        unbounded_status_codes = (
85            status.MIP_unbounded,
86        )
87
88    if solstat == status.MIP_optimal:
89        return status.optimal
90    elif solstat == status.MIP_infeasible:
91        return status.infeasible
92    elif solstat in (status.MIP_time_limit_feasible,
93                     status.MIP_time_limit_infeasible):
94        return status.abort_time_limit
95    elif solstat in (status.MIP_dettime_limit_feasible,
96                     status.MIP_dettime_limit_infeasible):
97        return status.abort_dettime_limit
98    elif solstat in (status.MIP_abort_feasible,
99                     status.MIP_abort_infeasible):
100        return status.abort_user
101    elif solstat == status.MIP_optimal_infeasible:
102        return status.optimal_infeasible
103    elif solstat == status.MIP_infeasible_or_unbounded:
104        return status.infeasible_or_unbounded
105    elif solstat in unbounded_status_codes:
106        return status.unbounded
107    elif solstat in (status.feasible_relaxed_sum,
108                     status.MIP_feasible_relaxed_sum,
109                     status.optimal_relaxed_sum,
110                     status.MIP_optimal_relaxed_sum,
111                     status.feasible_relaxed_inf,
112                     status.MIP_feasible_relaxed_inf,
113                     status.optimal_relaxed_inf,
114                     status.MIP_optimal_relaxed_inf,
115                     status.feasible_relaxed_quad,
116                     status.MIP_feasible_relaxed_quad,
117                     status.optimal_relaxed_quad,
118                     status.MIP_optimal_relaxed_quad):
119        raise AssertionError(
120            "feasopt status encountered: {0}".format(solstat))
121    elif solstat in (status.conflict_feasible,
122                     status.conflict_minimal,
123                     status.conflict_abort_contradiction,
124                     status.conflict_abort_time_limit,
125                     status.conflict_abort_dettime_limit,
126                     status.conflict_abort_iteration_limit,
127                     status.conflict_abort_node_limit,
128                     status.conflict_abort_obj_limit,
129                     status.conflict_abort_memory_limit,
130                     status.conflict_abort_user):
131        raise AssertionError(
132            "conflict refiner status encountered: {0}".format(solstat))
133    elif solstat == status.relaxation_unbounded:
134        return status.relaxation_unbounded
135    elif solstat in (status.feasible,
136                     status.MIP_feasible):
137        return status.feasible
138    elif solstat == status.benders_num_best:
139        return status.num_best
140    else:
141        return solstat
142
143
144def get_status(model):
145    """Map CPLEX status to CPXPY status."""
146    pfeas = model.solution.is_primal_feasible()
147    # NOTE: dfeas is always false for a MIP.
148    dfeas = model.solution.is_dual_feasible()
149    status = model.solution.status
150    solstat = _handle_solve_status(model, model.solution.get_status())
151    if solstat in (status.node_limit_infeasible,
152                   status.fail_infeasible,
153                   status.mem_limit_infeasible,
154                   status.fail_infeasible_no_tree,
155                   status.num_best):
156        return s.SOLVER_ERROR
157    elif solstat in (status.abort_user,
158                     status.abort_iteration_limit,
159                     status.abort_time_limit,
160                     status.abort_dettime_limit,
161                     status.abort_obj_limit,
162                     status.abort_primal_obj_limit,
163                     status.abort_dual_obj_limit,
164                     status.abort_relaxed,
165                     status.first_order):
166        if pfeas:
167            return s.OPTIMAL_INACCURATE
168        else:
169            return s.SOLVER_ERROR
170    elif solstat in (status.node_limit_feasible,
171                     status.solution_limit,
172                     status.populate_solution_limit,
173                     status.fail_feasible,
174                     status.mem_limit_feasible,
175                     status.fail_feasible_no_tree,
176                     status.feasible):
177        if dfeas:
178            return s.OPTIMAL
179        else:
180            return s.OPTIMAL_INACCURATE
181    elif solstat in (status.optimal,
182                     status.optimal_tolerance,
183                     status.optimal_infeasible,
184                     status.optimal_populated,
185                     status.optimal_populated_tolerance):
186        return s.OPTIMAL
187    elif solstat in (status.infeasible,
188                     status.optimal_relaxed_sum,
189                     status.optimal_relaxed_inf,
190                     status.optimal_relaxed_quad):
191        return s.INFEASIBLE
192    elif solstat in (status.feasible_relaxed_quad,
193                     status.feasible_relaxed_inf,
194                     status.feasible_relaxed_sum):
195        return s.SOLVER_ERROR
196    elif solstat in (status.unbounded,
197                     status.infeasible_or_unbounded):
198        return s.UNBOUNDED
199    else:
200        return s.SOLVER_ERROR
201
202
203class CPLEX(ConicSolver):
204    """An interface for the CPLEX solver.
205    """
206
207    # Solver capabilities.
208    MIP_CAPABLE = True
209    SUPPORTED_CONSTRAINTS = ConicSolver.SUPPORTED_CONSTRAINTS + [SOC]
210
211    def name(self):
212        """The name of the solver. """
213        return s.CPLEX
214
215    def import_solver(self) -> None:
216        """Imports the solver."""
217        import cplex
218        cplex  # For flake8
219
220    def accepts(self, problem) -> bool:
221        """Can CPLEX solve the problem?
222        """
223        # TODO check if is matrix stuffed.
224        if not problem.objective.args[0].is_affine():
225            return False
226        for constr in problem.constraints:
227            if type(constr) not in CPLEX.SUPPORTED_CONSTRAINTS:
228                return False
229            for arg in constr.args:
230                if not arg.is_affine():
231                    return False
232        return True
233
234    def apply(self, problem):
235        """Returns a new problem and data for inverting the new solution.
236
237        Returns
238        -------
239        tuple
240            (dict of arguments needed for the solver, inverse data)
241        """
242        data, inv_data = super(CPLEX, self).apply(problem)
243        variables = problem.x
244        data[s.BOOL_IDX] = [int(t[0]) for t in variables.boolean_idx]
245        data[s.INT_IDX] = [int(t[0]) for t in variables.integer_idx]
246        inv_data['is_mip'] = data[s.BOOL_IDX] or data[s.INT_IDX]
247
248        return data, inv_data
249
250    def invert(self, solution, inverse_data):
251        """Returns the solution to the original problem given the inverse_data.
252        """
253        model = solution["model"]
254        attr = {}
255        if s.SOLVE_TIME in solution:
256            attr[s.SOLVE_TIME] = solution[s.SOLVE_TIME]
257
258        status = get_status(model)
259
260        primal_vars = None
261        dual_vars = None
262        if status in s.SOLUTION_PRESENT:
263            opt_val = (model.solution.get_objective_value() +
264                       inverse_data[s.OFFSET])
265
266            x = np.array(model.solution.get_values())
267            primal_vars = {inverse_data[CPLEX.VAR_ID]: x}
268
269            if not inverse_data['is_mip']:
270                # The dual values are retrieved in the order that the
271                # constraints were added in solve_via_data() below. We
272                # must be careful to map them to inverse_data[EQ_CONSTR]
273                # followed by inverse_data[NEQ_CONSTR] accordingly.
274                y = -np.array(model.solution.get_dual_values())
275                dual_vars = utilities.get_dual_values(
276                    y,
277                    utilities.extract_dual_value,
278                    inverse_data[CPLEX.EQ_CONSTR] + inverse_data[CPLEX.NEQ_CONSTR])
279
280            return Solution(status, opt_val, primal_vars, dual_vars, attr)
281        else:
282            return failure_solution(status)
283
284    def solve_via_data(self, data, warm_start: bool, verbose: bool, solver_opts, solver_cache=None):
285        import cplex
286
287        c = data[s.C]
288        b = data[s.B]
289        A = dok_matrix(data[s.A])
290        # Save the dok_matrix.
291        data[s.A] = A
292        dims = dims_to_solver_dict(data[s.DIMS])
293
294        n = c.shape[0]
295
296        model = cplex.Cplex()
297        variables = []
298        # cpx_constrs will contain CpxConstr namedtuples (see above).
299        cpx_constrs = []
300        vtype = []
301        if data[s.BOOL_IDX] or data[s.INT_IDX]:
302            for i in range(n):
303                # Set variable type.
304                if i in data[s.BOOL_IDX]:
305                    vtype.append('B')
306                elif i in data[s.INT_IDX]:
307                    vtype.append('I')
308                else:
309                    vtype.append('C')
310        else:
311            # If we specify types (even with 'C'), then the problem will
312            # be interpreted as a MIP. Leaving vtype as an empty list
313            # here, will ensure that the problem type remains an LP.
314            pass
315        # Add the variables in a batch
316        variables = list(model.variables.add(
317            obj=[c[i] for i in range(n)],
318            lb=[-cplex.infinity]*n,  # default LB is 0
319            ub=[cplex.infinity]*n,
320            types="".join(vtype),
321            names=["x_%d" % i for i in range(n)]))
322
323        # Add equality constraints
324        cpx_constrs += [_CpxConstr(_LIN, x)
325                        for x in self.add_model_lin_constr(
326                                model, variables,
327                                range(dims[s.EQ_DIM]),
328                                'E', A, b)]
329
330        # Add inequality (<=) constraints
331        leq_start = dims[s.EQ_DIM]
332        leq_end = dims[s.EQ_DIM] + dims[s.LEQ_DIM]
333        cpx_constrs += [_CpxConstr(_LIN, x)
334                        for x in self.add_model_lin_constr(
335                                model, variables,
336                                range(leq_start, leq_end),
337                                'L', A, b)]
338
339        # Add SOC constraints
340        soc_start = leq_end
341        for constr_len in dims[s.SOC_DIM]:
342            soc_end = soc_start + constr_len
343            soc_constr, new_leq, new_vars = self.add_model_soc_constr(
344                model, variables, range(soc_start, soc_end), A, b)
345            cpx_constrs.append(_CpxConstr(_QUAD, soc_constr))
346            cpx_constrs += [_CpxConstr(_LIN, x) for x in new_leq]
347            variables += new_vars
348            soc_start += constr_len
349
350        # Set verbosity
351        if not verbose:
352            hide_solver_output(model)
353
354        # For CVXPY, we set the qcpduals parameter here, but the user can
355        # easily override it via the "cplex_params" solver option (see
356        # set_parameters function).
357        model.parameters.preprocessing.qcpduals.set(
358            model.parameters.preprocessing.qcpduals.values.force)
359
360        # Set parameters
361        set_parameters(model, solver_opts)
362
363        # Solve problem
364        solution = {"model": model}
365        try:
366            start_time = model.get_time()
367            model.solve()
368            solution[s.SOLVE_TIME] = model.get_time() - start_time
369        except Exception:
370            pass
371
372        return solution
373
374    def add_model_lin_constr(self, model, variables,
375                             rows, ctype, mat, vec):
376        """Adds EQ/LEQ constraints to the model using the data from mat and vec.
377
378        Parameters
379        ----------
380        model : CPLEX model
381            The problem model.
382        variables : list
383            The problem variables.
384        rows : range
385            The rows to be constrained.
386        ctype : CPLEX constraint type
387            The type of constraint.
388        mat : SciPy COO matrix
389            The matrix representing the constraints.
390        vec : NDArray
391            The RHS part of the constraints.
392
393        Returns
394        -------
395        list
396            A list of new linear constraint indices.
397        """
398        constr, lin_expr, rhs = [], [], []
399        csr = mat.tocsr()
400        for i in rows:
401            ind = [variables[x] for x in csr[i].indices]
402            val = [x for x in csr[i].data]
403            lin_expr.append([ind, val])
404            rhs.append(vec[i])
405        # For better performance, we add the constraints in a batch.
406        if lin_expr:
407            assert len(lin_expr) == len(rhs)
408            constr.extend(list(
409                model.linear_constraints.add(
410                    lin_expr=lin_expr,
411                    senses=ctype * len(lin_expr),
412                    rhs=rhs)))
413        return constr
414
415    def add_model_soc_constr(self, model, variables,
416                             rows, mat, vec):
417        """Adds SOC constraint to the model using the data from mat and vec.
418
419        Parameters
420        ----------
421        model : CPLEX model
422            The problem model.
423        variables : list
424            The problem variables.
425        rows : range
426            The rows to be constrained.
427        mat : SciPy COO matrix
428            The matrix representing the constraints.
429        vec : NDArray
430            The RHS part of the constraints.
431
432        Returns
433        -------
434        tuple
435            A tuple of (a new quadratic constraint index, a list of new
436            supporting linear constr indices, and a list of new
437            supporting variable indices).
438        """
439        import cplex
440
441        # Assume first expression (i.e. t) is nonzero.
442        lin_expr_list, soc_vars, lin_rhs = [], [], []
443        csr = mat.tocsr()
444        for i in rows:
445            ind = [variables[x] for x in csr[i].indices]
446            val = [x for x in csr[i].data]
447            # Ignore empty constraints.
448            if ind:
449                lin_expr_list.append((ind, val))
450            else:
451                lin_expr_list.append(None)
452            lin_rhs.append(vec[i])
453
454        # Make a variable and equality constraint for each term.
455        soc_vars, is_first = [], True
456        for i in rows:
457            if is_first:
458                lb = [0.0]
459                names = ["soc_t_%d" % i]
460                is_first = False
461            else:
462                lb = [-cplex.infinity]
463                names = ["soc_x_%d" % i]
464            soc_vars.extend(list(model.variables.add(
465                obj=[0],
466                lb=lb,
467                ub=[cplex.infinity],
468                types="",
469                names=names)))
470
471        new_lin_constrs = []
472        for i, expr in enumerate(lin_expr_list):
473            if expr is None:
474                ind = [soc_vars[i]]
475                val = [1.0]
476            else:
477                ind, val = expr
478                ind.append(soc_vars[i])
479                val.append(1.0)
480            new_lin_constrs.extend(list(
481                model.linear_constraints.add(
482                    lin_expr=[cplex.SparsePair(ind=ind, val=val)],
483                    senses="E",
484                    rhs=[lin_rhs[i]])))
485
486        assert len(soc_vars) > 0
487        qconstr = model.quadratic_constraints.add(
488            lin_expr=cplex.SparsePair(ind=[], val=[]),
489            quad_expr=cplex.SparseTriple(
490                ind1=soc_vars,
491                ind2=soc_vars,
492                val=[-1.0] + [1.0] * (len(soc_vars) - 1)),
493            sense="L",
494            rhs=0.0,
495            name="")
496        return (qconstr, new_lin_constrs, soc_vars)
497