1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 2002, 2014 Oracle and/or its affiliates.  All rights reserved.
5  *
6  */
7 
8 package com.sleepycat.je.rep.elections;
9 
10 import java.io.BufferedReader;
11 import java.io.IOException;
12 import java.io.InputStreamReader;
13 import java.io.PrintWriter;
14 import java.nio.channels.Channels;
15 import java.util.logging.Level;
16 
17 import com.sleepycat.je.EnvironmentFailureException;
18 import com.sleepycat.je.JEVersion;
19 import com.sleepycat.je.rep.elections.Proposer.Proposal;
20 import com.sleepycat.je.rep.elections.Protocol.Accept;
21 import com.sleepycat.je.rep.elections.Protocol.Propose;
22 import com.sleepycat.je.rep.elections.Protocol.Value;
23 import com.sleepycat.je.rep.impl.TextProtocol.InvalidMessageException;
24 import com.sleepycat.je.rep.impl.TextProtocol.RequestMessage;
25 import com.sleepycat.je.rep.impl.TextProtocol.ResponseMessage;
26 import com.sleepycat.je.rep.impl.node.RepNode;
27 import com.sleepycat.je.rep.net.DataChannel;
28 import com.sleepycat.je.rep.utilint.ServiceDispatcher;
29 import com.sleepycat.je.utilint.LoggerUtils;
30 
31 /**
32  * Plays the role of Acceptor in the consensus algorithm. It runs in its
33  * own thread listening for and responding to messages sent by Proposers.
34  */
35 public class Acceptor extends ElectionAgentThread {
36 
37     /* The RepNode associated with the Acceptor */
38     private final RepNode repNode;
39 
40     /*
41      * The currently promised proposal. Proposals below this one are rejected.
42      */
43     private Proposal promisedProposal = null;
44 
45     private Value acceptedValue = null;
46 
47     /* Used to return suggestions in response to Propose requests. */
48     private final SuggestionGenerator suggestionGenerator;
49 
50     /* Identifies the Acceptor Service. */
51     public static final String SERVICE_NAME = "Acceptor";
52 
53     /**
54      * Creates an Acceptor
55      */
Acceptor(Protocol protocol, RepNode repNode, SuggestionGenerator suggestionGenerator)56     public Acceptor(Protocol protocol,
57                     RepNode  repNode,
58                     SuggestionGenerator suggestionGenerator) {
59 
60         super(repNode, protocol,
61               "Acceptor Thread " + repNode.getNameIdPair().getName());
62         this.repNode = repNode;
63 
64         this.suggestionGenerator = suggestionGenerator;
65     }
66 
67     /**
68      * The Acceptor thread body.
69      */
70     @Override
run()71     public void run() {
72         final ServiceDispatcher serviceDispatcher =
73             repNode.getServiceDispatcher();
74         serviceDispatcher.register(SERVICE_NAME, channelQueue);
75         LoggerUtils.logMsg
76             (logger, envImpl, formatter, Level.FINE, "Acceptor started");
77         DataChannel channel = null;
78         try {
79             while (true) {
80                 channel = serviceDispatcher.takeChannel
81                     (SERVICE_NAME, true /* block */,
82                      protocol.getReadTimeout());
83 
84                 if (channel == null) {
85                     /* A soft shutdown. */
86                     return;
87                 }
88 
89                 BufferedReader in = null;
90                 PrintWriter out = null;
91                 try {
92                     in = new BufferedReader(
93                         new InputStreamReader(
94                             Channels.newInputStream(channel)));
95                     out = new PrintWriter(
96                         Channels.newOutputStream(channel), true);
97                     String requestLine = in.readLine();
98                     if (requestLine == null) {
99                         LoggerUtils.logMsg(logger, envImpl,
100                                            formatter, Level.FINE,
101                                            "Acceptor: EOF on request");
102                         continue;
103                     }
104                     RequestMessage requestMessage = null;
105                     try {
106                         requestMessage = protocol.parseRequest(requestLine);
107                     } catch (InvalidMessageException e) {
108                         LoggerUtils.logMsg(logger, envImpl,
109                                            formatter, Level.WARNING,
110                                            "Message format error: " +
111                                            e.getMessage());
112                         out.println
113                             (protocol.new ProtocolError(e).wireFormat());
114                         continue;
115                     }
116                     ResponseMessage responseMessage = null;
117                     if (requestMessage.getOp() == protocol.PROPOSE) {
118                         responseMessage = process((Propose) requestMessage);
119                     } else if (requestMessage.getOp() == protocol.ACCEPT) {
120                         responseMessage = process((Accept) requestMessage);
121                     } else if (requestMessage.getOp() == protocol.SHUTDOWN) {
122                         break;
123                     } else {
124                         LoggerUtils.logMsg(logger, envImpl,
125                                            formatter, Level.SEVERE,
126                                            "Unrecognized request: " +
127                                            requestLine);
128                         continue;
129                     }
130 
131                     /*
132                      * The request message may be of an earlier version. If so,
133                      * this node transparently read the older version. JE only
134                      * throws out InvalidMesageException when the version of
135                      * the request message is newer than the current protocol.
136                      * To avoid sending a repsonse that the requester cannot
137                      * understand, we send a response in the same version as
138                      * that of the original request message.
139                      */
140                     responseMessage.setSendVersion
141                         (requestMessage.getSendVersion());
142                     out.println(responseMessage.wireFormat());
143                 } catch (IOException e) {
144                     LoggerUtils.logMsg
145                         (logger, envImpl, formatter, Level.WARNING,
146                          "IO error on socket: " + e.getMessage());
147                     continue;
148                 } finally {
149                     Utils.cleanup(logger, envImpl, formatter, channel, in, out);
150                     cleanup();
151                 }
152             }
153         } catch (InterruptedException e) {
154             if (isShutdown()) {
155                 /* Treat it like a shutdown, exit the thread. */
156                 return;
157             }
158             LoggerUtils.logMsg(logger, envImpl, formatter, Level.WARNING,
159                                "Acceptor unexpected interrupted");
160             throw EnvironmentFailureException.unexpectedException(e);
161         } finally {
162             serviceDispatcher.cancel(SERVICE_NAME);
163             cleanup();
164         }
165     }
166 
167     /**
168      * Responds to a Propose request.
169      *
170      * @param propose the request proposal
171      *
172      * @return the response: a Promise if the request was accepted, a Reject
173      *         otherwise.
174      */
process(Propose propose)175     ResponseMessage process(Propose propose) {
176 
177         if ((promisedProposal != null) &&
178             (promisedProposal.compareTo(propose.getProposal()) > 0)) {
179             LoggerUtils.logMsg(logger, envImpl, formatter, Level.FINE,
180                                "Reject Propose: " + propose.getProposal() +
181                                " Promised proposal: " + promisedProposal);
182             return protocol.new Reject(promisedProposal);
183         }
184 
185         promisedProposal = propose.getProposal();
186         final Value suggestedValue = suggestionGenerator.get(promisedProposal);
187         final long suggestionRanking =
188             suggestionGenerator.getRanking(promisedProposal);
189         LoggerUtils.logMsg(logger, envImpl, formatter, Level.FINE,
190                            "Promised: " + promisedProposal +
191                            " Suggested Value: " + suggestedValue +
192                            " Suggestion Ranking: " + suggestionRanking);
193         return protocol.new Promise
194                 (promisedProposal,
195                  acceptedValue,
196                  suggestedValue,
197                  suggestionRanking,
198                  repNode.getElectionPriority(),
199                  repNode.getLogVersion(),
200                  JEVersion.CURRENT_VERSION);
201     }
202 
203     /**
204      * Responds to Accept request
205      *
206      * @param accept the request
207      * @return an Accepted or Reject response as appropriate.
208      */
process(Accept accept)209     ResponseMessage process(Accept accept) {
210         if ((promisedProposal != null) &&
211             (promisedProposal.compareTo(accept.getProposal()) != 0)) {
212             LoggerUtils.logMsg(logger, envImpl, formatter, Level.FINE,
213                                "Reject Accept: " + accept.getProposal() +
214                                " Promised proposal: " + promisedProposal);
215             return protocol.new Reject(promisedProposal);
216         }
217         acceptedValue = accept.getValue();
218         LoggerUtils.logMsg(logger, envImpl, formatter, Level.FINE,
219                            "Promised: " + promisedProposal + " Accepted: " +
220                            accept.getProposal() + " Value: " + acceptedValue);
221         return protocol.new Accepted(accept.getProposal(), acceptedValue);
222     }
223 
224     public interface SuggestionGenerator {
225 
226         /**
227          * Used to generate a suggested value for use by a Proposer. It's a
228          * hint.  The proposal argument may be used to freeze values like the
229          * VLSN number from advancing (if they were used in the ranking) until
230          * an election has completed.
231          *
232          * @param proposal the Proposal for which the value is being suggested.
233          *
234          * @return the suggested value.
235          */
get(Proposal proposal)236         abstract Value get(Proposal proposal);
237 
238         /**
239          * The importance associated with the above suggestion. Acceptors have
240          * to agree on a common system for ranking importance so that the
241          * relative importance of different suggestions can be meaningfully
242          * compared.
243          *
244          * @param the proposal associated with the ranking
245          *
246          * @return the importance of the suggestion as a number
247          */
getRanking(Proposal proposal)248         abstract long getRanking(Proposal proposal);
249     }
250 }
251