1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 2010, 2013 Oracle and/or its affiliates.  All rights reserved.
5  *
6  */
7 
8 package rep;
9 
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 
13 import com.sleepycat.db.*;
14 
15 import repmgrtests.Util;
16 import org.junit.Test;
17 import java.util.*;
18 
19 public class TestMirandaTimeout {
20     class /* struct */ Message {
21         DatabaseEntry ctl, rec;
22         int sourceEID;
23         boolean perm;
Message(DatabaseEntry ctl, DatabaseEntry rec, int eid, boolean p)24         Message(DatabaseEntry ctl, DatabaseEntry rec, int eid, boolean p) {
25             this.ctl = ctl;
26             this.rec = rec;
27             sourceEID = eid;
28             perm = p;
29         }
30     }
31 
32     List<Queue<Message>> queues = new ArrayList<Queue<Message>>();
33     Environment[] envs = new Environment[2];
34 
35     public static final int SELF = Integer.MAX_VALUE;
36 
deferredArrival()37     @Test public void deferredArrival() throws Exception {
38         EnvironmentConfig ec = new EnvironmentConfig();
39         ec.setAllowCreate(true);
40         ec.setInitializeCache(true);
41         ec.setInitializeLocking(true);
42         ec.setInitializeLogging(true);
43         ec.setInitializeReplication(true);
44         ec.setTransactional(true);
45         ec.setThreaded(true);
46         ec.setReplicationNumSites(2);
47         if (Boolean.getBoolean("VERB_REPLICATION"))
48             ec.setVerbose(VerboseConfig.REPLICATION, true);
49 
50         // The EID serves double duty as an index into the array of
51         // ArrayLists which are used as the message queues.  Master is
52         // EID 0 and client is EID 1.
53         //
54         queues.add(new LinkedList<Message>());
55         queues.add(new LinkedList<Message>());
56         ec.setReplicationTransport(SELF, new Transport(0));
57         Environment master = new Environment(Util.mkdir("master"), ec);
58         envs[0] = master;
59         ec.setReplicationTransport(SELF, new Transport(1));
60         Environment client = new Environment(Util.mkdir("client"), ec);
61         envs[1] = client;
62 
63         master.startReplication(null, true);
64         client.startReplication(null, false);
65 
66         processMessages();
67 
68         DatabaseConfig dc = new DatabaseConfig();
69         dc.setTransactional(true);
70         dc.setAllowCreate(true);
71         dc.setType(DatabaseType.BTREE);
72         Database db = master.openDatabase(null, "test.db", null, dc);
73         processMessages();
74 
75         Transaction txn = master.beginTransaction(null, null);
76         DatabaseEntry key = new DatabaseEntry();
77         DatabaseEntry value = new DatabaseEntry();
78         key.setData("mykey".getBytes());
79         value.setData("myvalue".getBytes());
80         db.put(txn, key, value);
81         txn.commit();
82         byte[] token1 = txn.getCommitToken();
83 
84         txn = master.beginTransaction(null, null);
85         key = new DatabaseEntry();
86         value = new DatabaseEntry();
87         key.setData("one,two".getBytes());
88         value.setData("buckle my shoe".getBytes());
89         db.put(txn, key, value);
90         txn.commit();
91         byte[] token2 = txn.getCommitToken();
92 
93         // Since we haven't sent pending msgs to the client yet, the
94         // transaction shouldn't be there yet.
95         //
96         assertEquals(TransactionStatus.TIMEOUT,
97                      client.isTransactionApplied(token1, 0));
98 
99 
100         // Start 2 client threads to wait for the transactions in
101         // reverse order, to make sure the waiting completes in the
102         // correct order.
103         //
104         Client clientTh2 = new Client(client, token2);
105         Thread t2 = new Thread(clientTh2, "clientThread2");
106         t2.start();
107         Thread.sleep(5000);
108 
109         Client clientTh1 = new Client(client, token1);
110         Thread t1 = new Thread(clientTh1, "clientThread1");
111         t1.start();
112         Thread.sleep(5000);
113 
114         processOnePerm();
115         Thread.sleep(5000);
116         processMessages();
117 
118         t1.join();
119         t2.join();
120 
121         assertEquals(TransactionStatus.APPLIED, clientTh1.getResult());
122         assertTrue(clientTh1.getDuration() > 1000 &&
123                    clientTh1.getDuration() < 100000);
124         assertEquals(TransactionStatus.APPLIED, clientTh2.getResult());
125 
126         // We started thread2 (way) before starting thread1, so its
127         // start time should be less (assuming the system isn't
128         // ridiculously busy).  But thread1 still should have
129         // completed (way) before thread2, because its transaction was
130         // replicated first (before a pause).
131         //
132         assertTrue(clientTh2.getStartTime() < clientTh1.getStartTime());
133         assertTrue(clientTh1.getEndTime() < clientTh2.getEndTime());
134 
135         db.close();
136         client.close();
137         master.close();
138     }
139 
140     class Client implements Runnable {
141         private Environment env;
142         private byte[] token;
143         private TransactionStatus result;
144         private long duration, endTime, startTime;
145         public void run() {
146             try {
147                 startTime = System.currentTimeMillis();
148                 result = env.isTransactionApplied(token, 100000000);
149                 endTime = System.currentTimeMillis();
150                 duration = endTime - startTime;
151             } catch (Exception e) {
152                 // if an exception happens, we leave "result"
153                 // unset, which will eventually cause a test failure
154                 // in the parent thread.
155                 e.printStackTrace();
156             }
157         }
158 
159         Client(Environment env, byte[] token) {
160             this.env = env;
161             this.token = token;
162         }
163 
164         long getDuration() { return duration; }
165         long getStartTime() { return startTime; }
166         long getEndTime() { return endTime; }
167         TransactionStatus getResult() { return result; }
168     }
169 
170 
171     class Transport implements ReplicationTransport {
172         private int myEID;
173         Transport(int eid) { myEID = eid; }
174         public int send(Environment env, DatabaseEntry ctl, DatabaseEntry rec,
175                         LogSequenceNumber lsn, int eid, boolean noBuf,
176                         boolean perm, boolean anywhere, boolean isRetry) {
177             int target = 1 - myEID;
178             queues.get(target).add(new Message(ctl, rec, myEID, perm));
179             return (0);
180         }
181     }
182 
183     void processOnePerm() throws Exception {
184         Queue<Message> q = queues.get(1); // the client is site 1
185         while (!q.isEmpty()) {
186             Message m = q.remove();
187             envs[1].processReplicationMessage(m.ctl, m.rec, m.sourceEID);
188             if (m.perm)
189                 return;
190         }
191     }
192 
193     void processMessages() throws Exception {
194         boolean done = false;
195         while (!done) {
196             boolean got = false;
197             for (int eid = 0; eid < 2; eid++) {
198                 Queue<Message> q = queues.get(eid);
199                 while (!q.isEmpty()) {
200                     got = true;
201                     Message m = q.remove();
202                     envs[eid].processReplicationMessage(m.ctl, m.rec, m.sourceEID);
203                 }
204             }
205             if (!got)
206                 done = true;
207         }
208     }
209 }
210