1# Copyright (c) 2021, Oracle and/or its affiliates. 2# 3# This program is free software; you can redistribute it and/or modify 4# it under the terms of the GNU General Public License, version 2.0, as 5# published by the Free Software Foundation. 6# 7# This program is also distributed with certain software (including 8# but not limited to OpenSSL) that is licensed under separate terms, 9# as designated in a particular file or component or in included license 10# documentation. The authors of MySQL hereby grant you an 11# additional permission to link the program and your derivative works 12# with the separately licensed software that they have included with 13# MySQL. 14# 15# Without limiting anything contained in the foregoing, this file, 16# which is part of MySQL Connector/Python, is also subject to the 17# Universal FOSS Exception, version 1.0, a copy of which can be found at 18# http://oss.oracle.com/licenses/universal-foss-exception. 19# 20# This program is distributed in the hope that it will be useful, but 21# WITHOUT ANY WARRANTY; without even the implied warranty of 22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. 23# See the GNU General Public License, version 2.0, for more details. 24# 25# You should have received a copy of the GNU General Public License 26# along with this program; if not, write to the Free Software Foundation, Inc., 27# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA 28 29import unittest 30 31import mysqlx 32import tests 33 34 35@unittest.skipIf(tests.MYSQL_VERSION < (8, 0, 25), "XPlugin not compatible") 36class ViewTests(tests.MySQLxTests): 37 """Tests for View. """ 38 39 @tests.foreach_session() 40 def test_is_view(self): 41 """Test is_view().""" 42 self.session.sql("create table t1(a int)").execute() 43 self.session.sql("insert into t1 values(1)").execute() 44 table = self.schema.get_table("t1") 45 self.assertFalse(table.is_view()) 46 self.session.sql("create view v1 as select * from t1").execute() 47 view = self.schema.get_table("v1") 48 self.assertTrue(view.is_view()) 49 self.session.sql("drop view v1").execute() 50 self.session.sql("drop table t1").execute() 51 52 @tests.foreach_session() 53 def test_view_select1(self): 54 self.session.sql("create table t2(id int, name varchar(32))").execute() 55 table = self.schema.get_table("t2") 56 table.insert("id", "name").values(1, "amr").values(2, "abc").execute() 57 self.session.sql("create view v2 as select * from t2").execute() 58 view = self.schema.get_table("v2") 59 result = view.select().where("id ==1").execute() 60 row = result.fetch_all() 61 self.assertEqual(row[0]["name"], "amr") 62 self.session.sql("drop view v2").execute() 63 self.session.sql("drop table t2").execute() 64 65 @tests.foreach_session() 66 def test_view_insert1(self): 67 self.session.sql( 68 "create table t3(_id int, name varchar(32))" 69 ).execute() 70 table = self.schema.get_table("t3") 71 table.insert("_id", "name").values(1, "amr").execute() 72 self.session.sql("create view v3 as select * from t3").execute() 73 view = self.schema.get_table("v3") 74 view.insert("_id", "name").values(2, "abc").execute() 75 result1 = view.select().execute() 76 row1 = result1.fetch_all() 77 self.assertEqual(len(row1), 2) 78 self.assertEqual(row1[0]["name"], "amr") 79 result2 = table.select().execute() 80 row2 = result2.fetch_all() 81 self.assertEqual(len(row2), 2) 82 self.assertEqual(row2[1]["name"], "abc") 83 self.session.sql("drop view v3").execute() 84 self.session.sql("drop table t3").execute() 85 86 @tests.foreach_session() 87 def test_view_update1(self): 88 self.session.sql( 89 "create table t4(_id int, name varchar(32))" 90 ).execute() 91 table = self.schema.get_table("t4") 92 table.insert("_id", "name").values(1, "amr").execute() 93 self.session.sql("create view v4 as select * from t4").execute() 94 view = self.schema.get_table("v4") 95 view.update().set("name", "abc").where("_id == 1").execute() 96 result1 = view.select().execute() 97 row1 = result1.fetch_all() 98 self.assertEqual(row1[0]["name"], "abc") 99 result2 = table.select().execute() 100 row2 = result2.fetch_all() 101 self.assertEqual(row2[0]["name"], "abc") 102 self.session.sql("drop view v4").execute() 103 self.session.sql("drop table t4").execute() 104 105 @tests.foreach_session() 106 def test_view_delete1(self): 107 self.session.sql( 108 "create table t5(_id int, name varchar(32))" 109 ).execute() 110 table = self.schema.get_table("t5") 111 table.insert("_id", "name").values(1, "amr").execute() 112 self.session.sql("create view v5 as select * from t5").execute() 113 view = self.schema.get_table("v5") 114 self.assertEqual(table.count(), 1) 115 try: 116 view.delete().execute() 117 except mysqlx.ProgrammingError: 118 # Expected ProgrammingErorr 119 pass 120 self.assertEqual(table.count(), 1) 121 self.session.sql("drop view v5").execute() 122 self.session.sql("drop table t5").execute() 123 124 @tests.foreach_session() 125 def test_view_insert2(self): 126 self.session.sql("create table t6(a int , b int)").execute() 127 self.session.sql("create table t7(c int,d int)").execute() 128 t1 = self.schema.get_table("t6") 129 t2 = self.schema.get_table("t7") 130 t1.insert("a", "b").values(1, 2).execute() 131 t2.insert("c", "d").values(1, 4).execute() 132 self.session.sql( 133 "create view v6 as select * from t6 join t7 on t6.a = t7.c" 134 ).execute() 135 view = self.schema.get_table("v6") 136 try: 137 view.insert().values(2, 3).execute() 138 except mysqlx.OperationalError: 139 # Expected OperationalError 140 pass 141 self.session.sql("drop view v6").execute() 142 self.session.sql("drop table t6").execute() 143 self.session.sql("drop table t7").execute() 144 145 @tests.foreach_session() 146 def test_view_update2(self): 147 self.session.sql("create table t8(a int , b int)").execute() 148 self.session.sql("create table t9(c int,d int)").execute() 149 t1 = self.schema.get_table("t8") 150 t2 = self.schema.get_table("t9") 151 t1.insert("a", "b").values(1, 2).execute() 152 t2.insert("c", "d").values(1, 4).execute() 153 self.session.sql( 154 "create view v7 as select t9.c as c_t9 from t8 join t9 on t8.a = t9.c" 155 ).execute() 156 view = self.schema.get_table("v7") 157 try: 158 view.update().set("c_t9", 100).execute() 159 except mysqlx.ProgrammingError: 160 # Expected ProgrammingError 161 pass 162 self.session.sql("drop view v7").execute() 163 self.session.sql("drop table t8").execute() 164 self.session.sql("drop table t9").execute() 165 166 @tests.foreach_session() 167 def test_view_update3(self): 168 self.session.sql("create table t12(a int , b int)").execute() 169 self.session.sql("create table t13(c int,d int)").execute() 170 t1 = self.schema.get_table("t12") 171 t2 = self.schema.get_table("t13") 172 t1.insert("a", "b").values(1, 2).execute() 173 t2.insert("c", "d").values(1, 4).execute() 174 self.session.sql( 175 "create view v9 as select t13.c as c_t13 from t12 join t13 on t12.a = t13.c" 176 ).execute() 177 view = self.schema.get_table("v9") 178 try: 179 view.update().set("c_t13", 100).limit(1).execute() 180 except mysqlx.ProgrammingError: 181 # Expected ProgrammingError 182 pass 183 self.session.sql("drop view v9").execute() 184 self.session.sql("drop table t12").execute() 185 self.session.sql("drop table t13").execute() 186 187 @tests.foreach_session() 188 def test_view_delete2(self): 189 self.session.sql("create table t10(a int , b int)").execute() 190 self.session.sql("create table t11(c int,d int)").execute() 191 t1 = self.schema.get_table("t10") 192 t2 = self.schema.get_table("t11") 193 t1.insert("a", "b").values(1, 2).execute() 194 t2.insert("c", "d").values(1, 4).execute() 195 self.session.sql( 196 "create view v8 as select t11.c as c_t11 from t10 join t11 on t10.a = t11.c" 197 ).execute() 198 view = self.schema.get_table("v8") 199 try: 200 view.delete().execute() 201 except mysqlx.ProgrammingError: 202 # Expected ProgrammingError 203 pass 204 self.session.sql("drop view v8").execute() 205 self.session.sql("drop table t10").execute() 206 self.session.sql("drop table t11").execute() 207 208 @tests.foreach_session() 209 def test_collection_view(self): 210 collection = self.schema.create_collection("mycoll1") 211 collection.add( 212 {"_id": 1, "name": "a"}, {"_id": 2, "name": "b"} 213 ).execute() 214 self.session.sql("create view v10 as select * from mycoll1").execute() 215 view = self.schema.get_table("v10") 216 result = view.select().where("_id ==1").execute() 217 result.fetch_all() 218 tables = self.schema.get_tables() 219 tables[0].get_name() 220 self.session.sql("drop view v10").execute() 221 self.schema.drop_collection("mycoll1") 222 223 @tests.foreach_session() 224 def test_view_drop1(self): 225 """Test a valid view drop.""" 226 self.session.sql( 227 "create table t14(_id int, name varchar(32))" 228 ).execute() 229 table = self.schema.get_table("t14") 230 table.insert("_id", "name").values(1, "amr").values(2, "dev").values( 231 3, "efg" 232 ).execute() 233 self.session.sql("create view v14 as select * from t14").execute() 234 view = self.schema.get_table("v14") 235 self.assertEqual(table.count(), 3) 236 self.session.sql("drop view v14").execute() 237 self.assertFalse(view.exists_in_database()) 238 self.session.sql("drop table t14").execute() 239 240 @tests.foreach_session() 241 def test_view_drop2(self): 242 """Test invalid view drop.""" 243 self.session.sql( 244 "create table t15(_id int, name varchar(32))" 245 ).execute() 246 table = self.schema.get_table("t15") 247 table.insert("_id", "name").values(1, "amr").values(2, "dev").values( 248 3, "efg" 249 ).execute() 250 view = self.schema.get_table("v15") 251 self.assertEqual(table.count(), 3) 252 self.assertFalse(view.exists_in_database()) 253 self.session.sql("drop table t15").execute() 254