1 /**
2  * SPDX-License-Identifier: GPL-2.0-or-later
3  *
4  * This file is part of osm2pgsql (https://osm2pgsql.org/).
5  *
6  * Copyright (C) 2006-2021 by the osm2pgsql developer community.
7  * For a full list of authors see the git log.
8  */
9 
10 #include "flex-table.hpp"
11 #include "format.hpp"
12 #include "logging.hpp"
13 #include "pgsql-helper.hpp"
14 #include "util.hpp"
15 
16 #include <cassert>
17 #include <string>
18 
type_to_char(osmium::item_type type)19 char const *type_to_char(osmium::item_type type) noexcept
20 {
21     switch (type) {
22     case osmium::item_type::node:
23         return "N";
24     case osmium::item_type::way:
25         return "W";
26     case osmium::item_type::relation:
27         return "R";
28     default:
29         break;
30     }
31     return "X";
32 }
33 
has_multicolumn_id_index() const34 bool flex_table_t::has_multicolumn_id_index() const noexcept
35 {
36     return m_columns[0].type() == table_column_type::id_type;
37 }
38 
id_column_names() const39 std::string flex_table_t::id_column_names() const
40 {
41     std::string name;
42 
43     if (!has_id_column()) {
44         return name;
45     }
46 
47     name = m_columns[0].name();
48     if (has_multicolumn_id_index()) {
49         name += ',';
50         name += m_columns[1].name();
51     }
52 
53     return name;
54 }
55 
full_name() const56 std::string flex_table_t::full_name() const
57 {
58     return qualified_name(schema(), name());
59 }
60 
full_tmp_name() const61 std::string flex_table_t::full_tmp_name() const
62 {
63     return qualified_name(schema(), name() + "_tmp");
64 }
65 
add_column(std::string const & name,std::string const & type,std::string const & sql_type)66 flex_table_column_t &flex_table_t::add_column(std::string const &name,
67                                               std::string const &type,
68                                               std::string const &sql_type)
69 {
70     // id_type (optional) and id_num must always be the first columns
71     assert(type != "id_type" || m_columns.empty());
72     assert(type != "id_num" || m_columns.empty() ||
73            (m_columns.size() == 1 &&
74             m_columns[0].type() == table_column_type::id_type));
75 
76     m_columns.emplace_back(name, type, sql_type);
77     auto &column = m_columns.back();
78 
79     if (column.is_geometry_column()) {
80         m_geom_column = m_columns.size() - 1;
81         column.set_not_null();
82     }
83 
84     return column;
85 }
86 
build_sql_prepare_get_wkb() const87 std::string flex_table_t::build_sql_prepare_get_wkb() const
88 {
89     if (has_multicolumn_id_index()) {
90         return "PREPARE get_wkb(char(1), bigint) AS"
91                " SELECT \"{}\" FROM {} WHERE \"{}\" = $1 AND \"{}\" = $2"_format(
92                    geom_column().name(), full_name(), m_columns[0].name(),
93                    m_columns[1].name());
94     }
95 
96     return "PREPARE get_wkb(bigint) AS"
97            " SELECT \"{}\" FROM {} WHERE \"{}\" = $1"_format(
98                geom_column().name(), full_name(), id_column_names());
99 }
100 
101 std::string
build_sql_create_table(table_type ttype,std::string const & table_name) const102 flex_table_t::build_sql_create_table(table_type ttype,
103                                      std::string const &table_name) const
104 {
105     assert(!m_columns.empty());
106 
107     std::string sql = "CREATE {} TABLE IF NOT EXISTS {} ("_format(
108         ttype == table_type::interim ? "UNLOGGED" : "", table_name);
109 
110     for (auto const &column : m_columns) {
111         // create_only columns are only created in permanent, not in the
112         // interim tables
113         if (ttype == table_type::permanent || !column.create_only()) {
114             sql += column.sql_create();
115         }
116     }
117 
118     assert(sql.back() == ',');
119     sql.back() = ')';
120 
121     if (ttype == table_type::interim) {
122         sql += " WITH (autovacuum_enabled = off)";
123     }
124 
125     sql += tablespace_clause(m_data_tablespace);
126 
127     return sql;
128 }
129 
build_sql_column_list() const130 std::string flex_table_t::build_sql_column_list() const
131 {
132     assert(!m_columns.empty());
133 
134     std::string result;
135     for (auto const &column : m_columns) {
136         if (!column.create_only()) {
137             result += '"';
138             result += column.name();
139             result += '"';
140             result += ',';
141         }
142     }
143     result.resize(result.size() - 1);
144 
145     return result;
146 }
147 
build_sql_create_id_index() const148 std::string flex_table_t::build_sql_create_id_index() const
149 {
150     return "CREATE INDEX ON {} USING BTREE ({}) {}"_format(
151         full_name(), id_column_names(), tablespace_clause(index_tablespace()));
152 }
153 
connect(std::string const & conninfo)154 void table_connection_t::connect(std::string const &conninfo)
155 {
156     assert(!m_db_connection);
157 
158     m_db_connection = std::make_unique<pg_conn_t>(conninfo);
159     m_db_connection->exec("SET synchronous_commit = off");
160 }
161 
start(bool append)162 void table_connection_t::start(bool append)
163 {
164     assert(m_db_connection);
165 
166     m_db_connection->exec("SET client_min_messages = WARNING");
167 
168     if (!append) {
169         m_db_connection->exec(
170             "DROP TABLE IF EXISTS {} CASCADE"_format(table().full_name()));
171     }
172 
173     // These _tmp tables can be left behind if we run out of disk space.
174     m_db_connection->exec(
175         "DROP TABLE IF EXISTS {}"_format(table().full_tmp_name()));
176     m_db_connection->exec("RESET client_min_messages");
177 
178     if (!append) {
179         m_db_connection->exec(table().build_sql_create_table(
180             table().cluster_by_geom() ? flex_table_t::table_type::interim
181                                       : flex_table_t::table_type::permanent,
182             table().full_name()));
183 
184         if (table().has_geom_column() &&
185             table().geom_column().needs_isvalid()) {
186             create_geom_check_trigger(m_db_connection.get(), table().schema(),
187                                       table().name(),
188                                       table().geom_column().name());
189         }
190     }
191 
192     prepare();
193 }
194 
stop(bool updateable,bool append)195 void table_connection_t::stop(bool updateable, bool append)
196 {
197     assert(m_db_connection);
198 
199     m_copy_mgr.sync();
200 
201     if (append) {
202         teardown();
203         return;
204     }
205 
206     if (table().cluster_by_geom()) {
207         if (table().geom_column().needs_isvalid()) {
208             drop_geom_check_trigger(m_db_connection.get(), table().schema(),
209                                     table().name());
210         }
211 
212         log_info("Clustering table '{}' by geometry...", table().name());
213 
214         // Notices about invalid geometries are expected and can be ignored
215         // because they say nothing about the validity of the geometry in OSM.
216         m_db_connection->exec("SET client_min_messages = WARNING");
217 
218         m_db_connection->exec(table().build_sql_create_table(
219             flex_table_t::table_type::permanent, table().full_tmp_name()));
220 
221         std::string const columns = table().build_sql_column_list();
222         std::string sql = "INSERT INTO {} ({}) SELECT {} FROM {}"_format(
223             table().full_tmp_name(), columns, columns, table().full_name());
224 
225         auto const postgis_version = get_postgis_version(*m_db_connection);
226 
227         sql += " ORDER BY ";
228         if (postgis_version.major == 2 && postgis_version.minor < 4) {
229             log_debug("Using GeoHash for clustering table '{}'",
230                       table().name());
231             if (table().geom_column().srid() == 4326) {
232                 sql += "ST_GeoHash({},10)"_format(table().geom_column().name());
233             } else {
234                 sql +=
235                     "ST_GeoHash(ST_Transform(ST_Envelope({}),4326),10)"_format(
236                         table().geom_column().name());
237             }
238             sql += " COLLATE \"C\"";
239         } else {
240             log_debug("Using native order for clustering table '{}'",
241                       table().name());
242             // Since Postgis 2.4 the order function for geometries gives
243             // useful results.
244             sql += table().geom_column().name();
245         }
246 
247         m_db_connection->exec(sql);
248 
249         m_db_connection->exec("DROP TABLE {}"_format(table().full_name()));
250         m_db_connection->exec("ALTER TABLE {} RENAME TO \"{}\""_format(
251             table().full_tmp_name(), table().name()));
252         m_id_index_created = false;
253 
254         if (updateable && table().geom_column().needs_isvalid()) {
255             create_geom_check_trigger(m_db_connection.get(), table().schema(),
256                                       table().name(),
257                                       table().geom_column().name());
258         }
259     }
260 
261     if (table().has_geom_column()) {
262         log_info("Creating geometry index on table '{}'...", table().name());
263 
264         // Use fillfactor 100 for un-updateable imports
265         m_db_connection->exec(
266             "CREATE INDEX ON {} USING GIST (\"{}\") {} {}"_format(
267                 table().full_name(), table().geom_column().name(),
268                 (updateable ? "" : "WITH (fillfactor = 100)"),
269                 tablespace_clause(table().index_tablespace())));
270     }
271 
272     if (updateable && table().has_id_column()) {
273         create_id_index();
274     }
275 
276     log_info("Analyzing table '{}'...", table().name());
277     analyze_table(*m_db_connection, table().schema(), table().name());
278 
279     teardown();
280 }
281 
prepare()282 void table_connection_t::prepare()
283 {
284     assert(m_db_connection);
285     if (table().has_id_column() && table().has_geom_column()) {
286         m_db_connection->exec(table().build_sql_prepare_get_wkb());
287     }
288 }
289 
create_id_index()290 void table_connection_t::create_id_index()
291 {
292     if (m_id_index_created) {
293         log_debug("Id index on table '{}' already created.", table().name());
294     } else {
295         log_info("Creating id index on table '{}'...", table().name());
296         m_db_connection->exec(table().build_sql_create_id_index());
297         m_id_index_created = true;
298     }
299 }
300 
get_geom_by_id(osmium::item_type type,osmid_t id) const301 pg_result_t table_connection_t::get_geom_by_id(osmium::item_type type,
302                                                osmid_t id) const
303 {
304     assert(table().has_geom_column());
305     assert(m_db_connection);
306     std::string const id_str = fmt::to_string(id);
307     if (table().has_multicolumn_id_index()) {
308         return m_db_connection->exec_prepared(
309             "get_wkb", type_to_char(type), id_str.c_str());
310     }
311     return m_db_connection->exec_prepared("get_wkb", id_str);
312 }
313 
delete_rows_with(osmium::item_type type,osmid_t id)314 void table_connection_t::delete_rows_with(osmium::item_type type, osmid_t id)
315 {
316     m_copy_mgr.new_line(m_target);
317 
318     if (!table().has_multicolumn_id_index()) {
319         type = osmium::item_type::undefined;
320     }
321     m_copy_mgr.delete_object(type_to_char(type)[0], id);
322 }
323 
task_wait()324 void table_connection_t::task_wait()
325 {
326     auto const run_time = m_task_result.wait();
327     log_info("All postprocessing on table '{}' done in {}.", table().name(),
328              util::human_readable_duration(run_time));
329 }
330