1# Copyright (c) 2021, Oracle and/or its affiliates.
2#
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License, version 2.0, as
5# published by the Free Software Foundation.
6#
7# This program is also distributed with certain software (including
8# but not limited to OpenSSL) that is licensed under separate terms,
9# as designated in a particular file or component or in included license
10# documentation.  The authors of MySQL hereby grant you an
11# additional permission to link the program and your derivative works
12# with the separately licensed software that they have included with
13# MySQL.
14#
15# Without limiting anything contained in the foregoing, this file,
16# which is part of MySQL Connector/Python, is also subject to the
17# Universal FOSS Exception, version 1.0, a copy of which can be found at
18# http://oss.oracle.com/licenses/universal-foss-exception.
19#
20# This program is distributed in the hope that it will be useful, but
21# WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
23# See the GNU General Public License, version 2.0, for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program; if not, write to the Free Software Foundation, Inc.,
27# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
28
29import unittest
30
31import mysqlx
32import tests
33
34
35@unittest.skipIf(tests.MYSQL_VERSION < (8, 0, 25), "XPlugin not compatible")
36class ViewTests(tests.MySQLxTests):
37    """Tests for View. """
38
39    @tests.foreach_session()
40    def test_is_view(self):
41        """Test is_view()."""
42        self.session.sql("create table t1(a int)").execute()
43        self.session.sql("insert into t1 values(1)").execute()
44        table = self.schema.get_table("t1")
45        self.assertFalse(table.is_view())
46        self.session.sql("create view v1 as select * from t1").execute()
47        view = self.schema.get_table("v1")
48        self.assertTrue(view.is_view())
49        self.session.sql("drop view v1").execute()
50        self.session.sql("drop table t1").execute()
51
52    @tests.foreach_session()
53    def test_view_select1(self):
54        self.session.sql("create table t2(id int, name varchar(32))").execute()
55        table = self.schema.get_table("t2")
56        table.insert("id", "name").values(1, "amr").values(2, "abc").execute()
57        self.session.sql("create view v2 as select * from t2").execute()
58        view = self.schema.get_table("v2")
59        result = view.select().where("id ==1").execute()
60        row = result.fetch_all()
61        self.assertEqual(row[0]["name"], "amr")
62        self.session.sql("drop view v2").execute()
63        self.session.sql("drop table t2").execute()
64
65    @tests.foreach_session()
66    def test_view_insert1(self):
67        self.session.sql(
68            "create table t3(_id int, name varchar(32))"
69        ).execute()
70        table = self.schema.get_table("t3")
71        table.insert("_id", "name").values(1, "amr").execute()
72        self.session.sql("create view v3 as select * from t3").execute()
73        view = self.schema.get_table("v3")
74        view.insert("_id", "name").values(2, "abc").execute()
75        result1 = view.select().execute()
76        row1 = result1.fetch_all()
77        self.assertEqual(len(row1), 2)
78        self.assertEqual(row1[0]["name"], "amr")
79        result2 = table.select().execute()
80        row2 = result2.fetch_all()
81        self.assertEqual(len(row2), 2)
82        self.assertEqual(row2[1]["name"], "abc")
83        self.session.sql("drop view v3").execute()
84        self.session.sql("drop table t3").execute()
85
86    @tests.foreach_session()
87    def test_view_update1(self):
88        self.session.sql(
89            "create table t4(_id int, name varchar(32))"
90        ).execute()
91        table = self.schema.get_table("t4")
92        table.insert("_id", "name").values(1, "amr").execute()
93        self.session.sql("create view v4 as select * from t4").execute()
94        view = self.schema.get_table("v4")
95        view.update().set("name", "abc").where("_id == 1").execute()
96        result1 = view.select().execute()
97        row1 = result1.fetch_all()
98        self.assertEqual(row1[0]["name"], "abc")
99        result2 = table.select().execute()
100        row2 = result2.fetch_all()
101        self.assertEqual(row2[0]["name"], "abc")
102        self.session.sql("drop view v4").execute()
103        self.session.sql("drop table t4").execute()
104
105    @tests.foreach_session()
106    def test_view_delete1(self):
107        self.session.sql(
108            "create table t5(_id int, name varchar(32))"
109        ).execute()
110        table = self.schema.get_table("t5")
111        table.insert("_id", "name").values(1, "amr").execute()
112        self.session.sql("create view v5 as select * from t5").execute()
113        view = self.schema.get_table("v5")
114        self.assertEqual(table.count(), 1)
115        try:
116            view.delete().execute()
117        except mysqlx.ProgrammingError:
118            # Expected ProgrammingErorr
119            pass
120        self.assertEqual(table.count(), 1)
121        self.session.sql("drop view v5").execute()
122        self.session.sql("drop table t5").execute()
123
124    @tests.foreach_session()
125    def test_view_insert2(self):
126        self.session.sql("create table t6(a int , b int)").execute()
127        self.session.sql("create table t7(c int,d int)").execute()
128        t1 = self.schema.get_table("t6")
129        t2 = self.schema.get_table("t7")
130        t1.insert("a", "b").values(1, 2).execute()
131        t2.insert("c", "d").values(1, 4).execute()
132        self.session.sql(
133            "create view v6 as select * from t6 join t7 on t6.a = t7.c"
134        ).execute()
135        view = self.schema.get_table("v6")
136        try:
137            view.insert().values(2, 3).execute()
138        except mysqlx.OperationalError:
139            # Expected OperationalError
140            pass
141        self.session.sql("drop view v6").execute()
142        self.session.sql("drop table t6").execute()
143        self.session.sql("drop table t7").execute()
144
145    @tests.foreach_session()
146    def test_view_update2(self):
147        self.session.sql("create table t8(a int , b int)").execute()
148        self.session.sql("create table t9(c int,d int)").execute()
149        t1 = self.schema.get_table("t8")
150        t2 = self.schema.get_table("t9")
151        t1.insert("a", "b").values(1, 2).execute()
152        t2.insert("c", "d").values(1, 4).execute()
153        self.session.sql(
154            "create view v7 as select t9.c as c_t9 from t8 join t9 on t8.a = t9.c"
155        ).execute()
156        view = self.schema.get_table("v7")
157        try:
158            view.update().set("c_t9", 100).execute()
159        except mysqlx.ProgrammingError:
160            # Expected ProgrammingError
161            pass
162        self.session.sql("drop view v7").execute()
163        self.session.sql("drop table t8").execute()
164        self.session.sql("drop table t9").execute()
165
166    @tests.foreach_session()
167    def test_view_update3(self):
168        self.session.sql("create table t12(a int , b int)").execute()
169        self.session.sql("create table t13(c int,d int)").execute()
170        t1 = self.schema.get_table("t12")
171        t2 = self.schema.get_table("t13")
172        t1.insert("a", "b").values(1, 2).execute()
173        t2.insert("c", "d").values(1, 4).execute()
174        self.session.sql(
175            "create view v9 as select t13.c as c_t13 from t12 join t13 on t12.a = t13.c"
176        ).execute()
177        view = self.schema.get_table("v9")
178        try:
179            view.update().set("c_t13", 100).limit(1).execute()
180        except mysqlx.ProgrammingError:
181            # Expected ProgrammingError
182            pass
183        self.session.sql("drop view v9").execute()
184        self.session.sql("drop table t12").execute()
185        self.session.sql("drop table t13").execute()
186
187    @tests.foreach_session()
188    def test_view_delete2(self):
189        self.session.sql("create table t10(a int , b int)").execute()
190        self.session.sql("create table t11(c int,d int)").execute()
191        t1 = self.schema.get_table("t10")
192        t2 = self.schema.get_table("t11")
193        t1.insert("a", "b").values(1, 2).execute()
194        t2.insert("c", "d").values(1, 4).execute()
195        self.session.sql(
196            "create view v8 as select t11.c as c_t11 from t10 join t11 on t10.a = t11.c"
197        ).execute()
198        view = self.schema.get_table("v8")
199        try:
200            view.delete().execute()
201        except mysqlx.ProgrammingError:
202            # Expected ProgrammingError
203            pass
204        self.session.sql("drop view v8").execute()
205        self.session.sql("drop table t10").execute()
206        self.session.sql("drop table t11").execute()
207
208    @tests.foreach_session()
209    def test_collection_view(self):
210        collection = self.schema.create_collection("mycoll1")
211        collection.add(
212            {"_id": 1, "name": "a"}, {"_id": 2, "name": "b"}
213        ).execute()
214        self.session.sql("create view v10 as select * from mycoll1").execute()
215        view = self.schema.get_table("v10")
216        result = view.select().where("_id ==1").execute()
217        result.fetch_all()
218        tables = self.schema.get_tables()
219        tables[0].get_name()
220        self.session.sql("drop view v10").execute()
221        self.schema.drop_collection("mycoll1")
222
223    @tests.foreach_session()
224    def test_view_drop1(self):
225        """Test a valid view drop."""
226        self.session.sql(
227            "create table t14(_id int, name varchar(32))"
228        ).execute()
229        table = self.schema.get_table("t14")
230        table.insert("_id", "name").values(1, "amr").values(2, "dev").values(
231            3, "efg"
232        ).execute()
233        self.session.sql("create view v14 as select * from t14").execute()
234        view = self.schema.get_table("v14")
235        self.assertEqual(table.count(), 3)
236        self.session.sql("drop view v14").execute()
237        self.assertFalse(view.exists_in_database())
238        self.session.sql("drop table t14").execute()
239
240    @tests.foreach_session()
241    def test_view_drop2(self):
242        """Test invalid view drop."""
243        self.session.sql(
244            "create table t15(_id int, name varchar(32))"
245        ).execute()
246        table = self.schema.get_table("t15")
247        table.insert("_id", "name").values(1, "amr").values(2, "dev").values(
248            3, "efg"
249        ).execute()
250        view = self.schema.get_table("v15")
251        self.assertEqual(table.count(), 3)
252        self.assertFalse(view.exists_in_database())
253        self.session.sql("drop table t15").execute()
254