1 /********************* */
2 /*! \file portfolio_util.h
3 ** \verbatim
4 ** Top contributors (to current version):
5 ** Morgan Deters, Kshitij Bansal, Tim King
6 ** This file is part of the CVC4 project.
7 ** Copyright (c) 2009-2019 by the authors listed in the file AUTHORS
8 ** in the top-level source directory) and their institutional affiliations.
9 ** All rights reserved. See the file COPYING in the top-level source
10 ** directory for licensing information.\endverbatim
11 **
12 ** \brief Code relevant only for portfolio builds
13 **/
14
15 #ifndef __CVC4__PORTFOLIO_UTIL_H
16 #define __CVC4__PORTFOLIO_UTIL_H
17
18 #include <queue>
19
20 #include "base/output.h"
21 #include "expr/pickler.h"
22 #include "smt/smt_engine.h"
23 #include "smt_util/lemma_input_channel.h"
24 #include "smt_util/lemma_output_channel.h"
25 #include "util/channel.h"
26
27 namespace CVC4 {
28
29 typedef expr::pickle::Pickle ChannelFormat;
30
31 class PortfolioLemmaOutputChannel : public LemmaOutputChannel {
32 private:
33 std::string d_tag;
34 SharedChannel<ChannelFormat>* d_sharedChannel;
35 expr::pickle::MapPickler d_pickler;
36
37 public:
38 int cnt;
PortfolioLemmaOutputChannel(std::string tag,SharedChannel<ChannelFormat> * c,ExprManager * em,VarMap & to,VarMap & from)39 PortfolioLemmaOutputChannel(std::string tag,
40 SharedChannel<ChannelFormat> *c,
41 ExprManager* em,
42 VarMap& to,
43 VarMap& from) :
44 d_tag(tag),
45 d_sharedChannel(c),
46 d_pickler(em, to, from),
47 cnt(0)
48 {}
49
~PortfolioLemmaOutputChannel()50 ~PortfolioLemmaOutputChannel() {}
51
52 void notifyNewLemma(Expr lemma) override;
53 };/* class PortfolioLemmaOutputChannel */
54
55 class PortfolioLemmaInputChannel : public LemmaInputChannel {
56 private:
57 std::string d_tag;
58 SharedChannel<ChannelFormat>* d_sharedChannel;
59 expr::pickle::MapPickler d_pickler;
60
61 public:
62 PortfolioLemmaInputChannel(std::string tag,
63 SharedChannel<ChannelFormat>* c,
64 ExprManager* em,
65 VarMap& to,
66 VarMap& from);
67
~PortfolioLemmaInputChannel()68 ~PortfolioLemmaInputChannel() {}
69
70 bool hasNewLemma() override;
71 Expr getNewLemma() override;
72
73 };/* class PortfolioLemmaInputChannel */
74
75 class OptionsList {
76 public:
77 OptionsList();
78 ~OptionsList();
79
80 void push_back_copy(const Options& options);
81
82 Options& operator[](size_t position);
83 const Options& operator[](size_t position) const;
84
85 Options& back();
86
87 size_t size() const;
88 private:
89 OptionsList(const OptionsList&) = delete;
90 OptionsList& operator=(const OptionsList&) = delete;
91 std::vector<Options*> d_options;
92 };
93
94 void parseThreadSpecificOptions(OptionsList& list, const Options& opts);
95
96 template<typename T>
sharingManager(unsigned numThreads,SharedChannel<T> * channelsOut[],SharedChannel<T> * channelsIn[],SmtEngine * smts[])97 void sharingManager(unsigned numThreads,
98 SharedChannel<T> *channelsOut[], // out and in with respect
99 SharedChannel<T> *channelsIn[],
100 SmtEngine *smts[]) // to smt engines
101 {
102 Trace("sharing") << "sharing: thread started " << std::endl;
103 std::vector <int> cnt(numThreads); // Debug("sharing")
104
105 std::vector< std::queue<T> > queues;
106 for(unsigned i = 0; i < numThreads; ++i){
107 queues.push_back(std::queue<T>());
108 }
109
110 const unsigned int sharingBroadcastInterval = 1;
111
112 boost::mutex mutex_activity;
113
114 /* Disable interruption, so that we can check manually */
115 boost::this_thread::disable_interruption di;
116
117 while(not boost::this_thread::interruption_requested()) {
118
119 boost::this_thread::sleep
120 (boost::posix_time::milliseconds(sharingBroadcastInterval));
121
122 for(unsigned t = 0; t < numThreads; ++t) {
123
124 /* No activity on this channel */
125 if(channelsOut[t]->empty()) continue;
126
127 /* Alert if channel full, so that we increase sharingChannelSize
128 or decrease sharingBroadcastInterval */
129 assert(not channelsOut[t]->full());
130
131 T data = channelsOut[t]->pop();
132
133 if(Trace.isOn("sharing")) {
134 ++cnt[t];
135 Trace("sharing") << "sharing: Got data. Thread #" << t
136 << ". Chunk " << cnt[t] << std::endl;
137 }
138
139 for(unsigned u = 0; u < numThreads; ++u) {
140 if(u != t){
141 Trace("sharing") << "sharing: adding to queue " << u << std::endl;
142 queues[u].push(data);
143 }
144 }/* end of inner for: broadcast activity */
145
146 } /* end of outer for: look for activity */
147
148 for(unsigned t = 0; t < numThreads; ++t){
149 /* Alert if channel full, so that we increase sharingChannelSize
150 or decrease sharingBroadcastInterval */
151 assert(not channelsIn[t]->full());
152
153 while(!queues[t].empty() && !channelsIn[t]->full()){
154 Trace("sharing") << "sharing: pushing on channel " << t << std::endl;
155 T data = queues[t].front();
156 channelsIn[t]->push(data);
157 queues[t].pop();
158 }
159 }
160 } /* end of infinite while */
161
162 Trace("interrupt")
163 << "sharing thread interrupted, interrupting all smtEngines" << std::endl;
164
165 for(unsigned t = 0; t < numThreads; ++t) {
166 Trace("interrupt") << "Interrupting thread #" << t << std::endl;
167 try{
168 smts[t]->interrupt();
169 }catch(ModalException &e){
170 // It's fine, the thread is probably not there.
171 Trace("interrupt") << "Could not interrupt thread #" << t << std::endl;
172 }
173 }
174
175 Trace("sharing") << "sharing: Interrupted, exiting." << std::endl;
176 }/* sharingManager() */
177
178 }/* CVC4 namespace */
179
180 #endif /* __CVC4__PORTFOLIO_UTIL_H */
181