1 /**
2  * SPDX-License-Identifier: GPL-2.0-or-later
3  *
4  * This file is part of osm2pgsql (https://osm2pgsql.org/).
5  *
6  * Copyright (C) 2006-2021 by the osm2pgsql developer community.
7  * For a full list of authors see the git log.
8  */
9 
10 #include <algorithm>
11 #include <cstdint>
12 #include <cstdio>
13 #include <limits>
14 #include <stdexcept>
15 #include <string>
16 #include <vector>
17 
18 #include "format.hpp"
19 #include "logging.hpp"
20 #include "options.hpp"
21 #include "pgsql-helper.hpp"
22 #include "table.hpp"
23 #include "taginfo.hpp"
24 #include "util.hpp"
25 
table_t(std::string const & name,std::string const & type,columns_t const & columns,hstores_t const & hstore_columns,int const srid,bool const append,hstore_column hstore_mode,std::shared_ptr<db_copy_thread_t> const & copy_thread,std::string const & schema)26 table_t::table_t(std::string const &name, std::string const &type,
27                  columns_t const &columns, hstores_t const &hstore_columns,
28                  int const srid, bool const append, hstore_column hstore_mode,
29                  std::shared_ptr<db_copy_thread_t> const &copy_thread,
30                  std::string const &schema)
31 : m_target(std::make_shared<db_target_descr_t>(name.c_str(), "osm_id")),
32   m_type(type), m_srid(fmt::to_string(srid)), m_append(append),
33   m_hstore_mode(hstore_mode), m_columns(columns),
34   m_hstore_columns(hstore_columns), m_copy(copy_thread)
35 {
36     m_target->schema = schema;
37 
38     // if we dont have any columns
39     if (m_columns.empty() && m_hstore_mode != hstore_column::all) {
40         throw std::runtime_error{
41             "No columns provided for table {}."_format(name)};
42     }
43 
44     generate_copy_column_list();
45 }
46 
table_t(table_t const & other,std::shared_ptr<db_copy_thread_t> const & copy_thread)47 table_t::table_t(table_t const &other,
48                  std::shared_ptr<db_copy_thread_t> const &copy_thread)
49 : m_conninfo(other.m_conninfo), m_target(other.m_target), m_type(other.m_type),
50   m_srid(other.m_srid), m_append(other.m_append),
51   m_hstore_mode(other.m_hstore_mode), m_columns(other.m_columns),
52   m_hstore_columns(other.m_hstore_columns), m_table_space(other.m_table_space),
53   m_copy(copy_thread)
54 {
55     // if the other table has already started, then we want to execute
56     // the same stuff to get into the same state. but if it hasn't, then
57     // this would be premature.
58     if (other.m_sql_conn) {
59         connect();
60         prepare();
61     }
62 }
63 
teardown()64 void table_t::teardown() { m_sql_conn.reset(); }
65 
sync()66 void table_t::sync() { m_copy.sync(); }
67 
connect()68 void table_t::connect()
69 {
70     m_sql_conn = std::make_unique<pg_conn_t>(m_conninfo);
71     //let commits happen faster by delaying when they actually occur
72     m_sql_conn->exec("SET synchronous_commit = off");
73 }
74 
start(std::string const & conninfo,std::string const & table_space)75 void table_t::start(std::string const &conninfo, std::string const &table_space)
76 {
77     if (m_sql_conn) {
78         throw std::runtime_error{m_target->name +
79                                  " cannot start, its already started."};
80     }
81 
82     m_conninfo = conninfo;
83     m_table_space = tablespace_clause(table_space);
84 
85     connect();
86     log_info("Setting up table '{}'", m_target->name);
87     m_sql_conn->exec("SET client_min_messages = WARNING");
88     auto const qual_name = qualified_name(m_target->schema, m_target->name);
89     auto const qual_tmp_name = qualified_name(
90         m_target->schema, m_target->name + "_tmp");
91 
92     // we are making a new table
93     if (!m_append) {
94         m_sql_conn->exec(
95             "DROP TABLE IF EXISTS {} CASCADE"_format(qual_name));
96     }
97 
98     // These _tmp tables can be left behind if we run out of disk space.
99     m_sql_conn->exec("DROP TABLE IF EXISTS {}"_format(qual_tmp_name));
100     m_sql_conn->exec("RESET client_min_messages");
101 
102     //making a new table
103     if (!m_append) {
104         //define the new table
105         auto sql =
106             "CREATE UNLOGGED TABLE {} (osm_id int8,"_format(qual_name);
107 
108         //first with the regular columns
109         for (auto const &column : m_columns) {
110             sql += "\"{}\" {},"_format(column.name, column.type_name);
111         }
112 
113         //then with the hstore columns
114         for (auto const &hcolumn : m_hstore_columns) {
115             sql += "\"{}\" hstore,"_format(hcolumn);
116         }
117 
118         //add tags column
119         if (m_hstore_mode != hstore_column::none) {
120             sql += "\"tags\" hstore,";
121         }
122 
123         sql += "way geometry({},{}) )"_format(m_type, m_srid);
124 
125         // The final tables are created with CREATE TABLE AS ... SELECT * FROM ...
126         // This means that they won't get this autovacuum setting, so it doesn't
127         // doesn't need to be RESET on these tables
128         sql += " WITH (autovacuum_enabled = off)";
129         //add the main table space
130         sql += m_table_space;
131 
132         //create the table
133         m_sql_conn->exec(sql);
134 
135         if (m_srid != "4326") {
136             create_geom_check_trigger(m_sql_conn.get(), m_target->schema,
137                                       m_target->name, "way");
138         }
139     }
140 
141     prepare();
142 }
143 
prepare()144 void table_t::prepare()
145 {
146     //let postgres cache this query as it will presumably happen a lot
147     auto const qual_name = qualified_name(m_target->schema, m_target->name);
148     m_sql_conn->exec(
149         "PREPARE get_wkb(int8) AS SELECT way FROM {} WHERE osm_id = $1"_format(
150             qual_name));
151 }
152 
generate_copy_column_list()153 void table_t::generate_copy_column_list()
154 {
155     m_target->rows = "osm_id,";
156     //first with the regular columns
157     for (auto const &column : m_columns) {
158         m_target->rows += '"';
159         m_target->rows += column.name;
160         m_target->rows += "\",";
161     }
162 
163     //then with the hstore columns
164     for (auto const &hcolumn : m_hstore_columns) {
165         m_target->rows += '"';
166         m_target->rows += hcolumn;
167         m_target->rows += "\",";
168     }
169 
170     //add tags column and geom column
171     if (m_hstore_mode != hstore_column::none) {
172         m_target->rows += "tags,way";
173         //or just the geom column
174     } else {
175         m_target->rows += "way";
176     }
177 }
178 
stop(bool updateable,bool enable_hstore_index,std::string const & table_space_index)179 void table_t::stop(bool updateable, bool enable_hstore_index,
180                    std::string const &table_space_index)
181 {
182     // make sure that all data is written to the DB before continuing
183     m_copy.sync();
184 
185     auto const qual_name = qualified_name(m_target->schema, m_target->name);
186     auto const qual_tmp_name = qualified_name(
187         m_target->schema, m_target->name + "_tmp");
188 
189     if (!m_append) {
190         if (m_srid != "4326") {
191             drop_geom_check_trigger(m_sql_conn.get(), m_target->schema,
192                                     m_target->name);
193         }
194 
195         log_info("Clustering table '{}' by geometry...", m_target->name);
196 
197         // Notices about invalid geometries are expected and can be ignored
198         // because they say nothing about the validity of the geometry in OSM.
199         m_sql_conn->exec("SET client_min_messages = WARNING");
200 
201         std::string sql =
202             "CREATE TABLE {} {} AS SELECT * FROM {}"_format(
203                 qual_tmp_name, m_table_space, qual_name);
204 
205         auto const postgis_version = get_postgis_version(*m_sql_conn);
206 
207         sql += " ORDER BY ";
208         if (postgis_version.major == 2 && postgis_version.minor < 4) {
209             log_debug("Using GeoHash for clustering table '{}'",
210                       m_target->name);
211             if (m_srid == "4326") {
212                 sql += "ST_GeoHash(way,10)";
213             } else {
214                 sql += "ST_GeoHash(ST_Transform(ST_Envelope(way),4326),10)";
215             }
216             sql += " COLLATE \"C\"";
217         } else {
218             log_debug("Using native order for clustering table '{}'",
219                       m_target->name);
220             // Since Postgis 2.4 the order function for geometries gives
221             // useful results.
222             sql += "way";
223         }
224 
225         m_sql_conn->exec(sql);
226 
227         m_sql_conn->exec("DROP TABLE {}"_format(qual_name));
228         m_sql_conn->exec("ALTER TABLE {} RENAME TO \"{}\""_format(
229             qual_tmp_name, m_target->name));
230 
231         log_info("Creating geometry index on table '{}'...", m_target->name);
232 
233         // Use fillfactor 100 for un-updatable imports
234         m_sql_conn->exec("CREATE INDEX ON {} USING GIST (way) {} {}"_format(
235             qual_name, (updateable ? "" : "WITH (fillfactor = 100)"),
236             tablespace_clause(table_space_index)));
237 
238         /* slim mode needs this to be able to apply diffs */
239         if (updateable) {
240             log_info("Creating osm_id index on table '{}'...", m_target->name);
241             m_sql_conn->exec(
242                 "CREATE INDEX ON {} USING BTREE (osm_id) {}"_format(
243                     qual_name, tablespace_clause(table_space_index)));
244             if (m_srid != "4326") {
245                 create_geom_check_trigger(m_sql_conn.get(), m_target->schema,
246                                           m_target->name, "way");
247             }
248         }
249 
250         /* Create hstore index if selected */
251         if (enable_hstore_index) {
252             log_info("Creating hstore indexes on table '{}'...",
253                      m_target->name);
254             if (m_hstore_mode != hstore_column::none) {
255                 m_sql_conn->exec(
256                     "CREATE INDEX ON {} USING GIN (tags) {}"_format(
257                         qual_name, tablespace_clause(table_space_index)));
258             }
259             for (auto const &hcolumn : m_hstore_columns) {
260                 m_sql_conn->exec(
261                     "CREATE INDEX ON {} USING GIN (\"{}\") {}"_format(
262                         qual_name, hcolumn,
263                         tablespace_clause(table_space_index)));
264             }
265         }
266         log_info("Analyzing table '{}'...", m_target->name);
267         analyze_table(*m_sql_conn, m_target->schema, m_target->name);
268     }
269     teardown();
270 }
271 
delete_row(osmid_t const id)272 void table_t::delete_row(osmid_t const id)
273 {
274     m_copy.new_line(m_target);
275     m_copy.delete_object(id);
276 }
277 
write_row(osmid_t id,taglist_t const & tags,std::string const & geom)278 void table_t::write_row(osmid_t id, taglist_t const &tags,
279                         std::string const &geom)
280 {
281     m_copy.new_line(m_target);
282 
283     //add the osm id
284     m_copy.add_column(id);
285 
286     // used to remember which columns have been written out already.
287     std::vector<bool> used;
288 
289     if (m_hstore_mode != hstore_column::none) {
290         used.assign(tags.size(), false);
291     }
292 
293     //get the regular columns' values
294     write_columns(tags, m_hstore_mode == hstore_column::norm ? &used : nullptr);
295 
296     //get the hstore columns' values
297     write_hstore_columns(tags);
298 
299     //get the key value pairs for the tags column
300     if (m_hstore_mode != hstore_column::none) {
301         write_tags_column(tags, used);
302     }
303 
304     //add the geometry - encoding it to hex along the way
305     m_copy.add_hex_geom(geom);
306 
307     //send all the data to postgres
308     m_copy.finish_line();
309 }
310 
write_columns(taglist_t const & tags,std::vector<bool> * used)311 void table_t::write_columns(taglist_t const &tags, std::vector<bool> *used)
312 {
313     for (auto const &column : m_columns) {
314         std::size_t const idx = tags.indexof(column.name);
315         if (idx != std::numeric_limits<std::size_t>::max()) {
316             escape_type(tags[idx].value, column.type);
317 
318             // Remember we already used this one so we can't use
319             // again later in the hstore column.
320             if (used) {
321                 (*used)[idx] = true;
322             }
323         } else {
324             m_copy.add_null_column();
325         }
326     }
327 }
328 
329 /// Write all tags to hstore. Exclude tags written to other columns and z_order.
write_tags_column(taglist_t const & tags,std::vector<bool> const & used)330 void table_t::write_tags_column(taglist_t const &tags,
331                                 std::vector<bool> const &used)
332 {
333     m_copy.new_hash();
334 
335     for (std::size_t i = 0; i < tags.size(); ++i) {
336         tag_t const &tag = tags[i];
337         if (!used[i] && (tag.key != "z_order")) {
338             m_copy.add_hash_elem(tag.key, tag.value);
339         }
340     }
341 
342     m_copy.finish_hash();
343 }
344 
345 /* write an hstore column to the database */
write_hstore_columns(taglist_t const & tags)346 void table_t::write_hstore_columns(taglist_t const &tags)
347 {
348     for (auto const &hcolumn : m_hstore_columns) {
349         bool added = false;
350 
351         for (auto const &tag : tags) {
352             //check if the tag's key starts with the name of the hstore column
353             if (tag.key.compare(0, hcolumn.size(), hcolumn) == 0) {
354                 char const *const shortkey = &tag.key[hcolumn.size()];
355 
356                 //and pack the shortkey with its value into the hstore
357                 //hstore ASCII representation looks like "key"=>"value"
358                 if (!added) {
359                     added = true;
360                     m_copy.new_hash();
361                 }
362 
363                 m_copy.add_hash_elem(shortkey, tag.value.c_str());
364             }
365         }
366 
367         if (added) {
368             m_copy.finish_hash();
369         } else {
370             m_copy.add_null_column();
371         }
372     }
373 }
374 
task_wait()375 void table_t::task_wait()
376 {
377     auto const run_time = m_task_result.wait();
378     log_info("All postprocessing on table '{}' done in {}.", m_target->name,
379              util::human_readable_duration(run_time));
380 }
381 
382 /* Escape data appropriate to the type */
escape_type(std::string const & value,ColumnType flags)383 void table_t::escape_type(std::string const &value, ColumnType flags)
384 {
385     switch (flags) {
386     case ColumnType::INT: {
387         // For integers we take the first number, or the average if it's a-b
388         long long from = 0;
389         long long to = 0;
390         // limit number of digits parsed to avoid undefined behaviour in sscanf
391         int const items =
392             std::sscanf(value.c_str(), "%18lld-%18lld", &from, &to);
393         if (items == 1 && from <= std::numeric_limits<int32_t>::max() &&
394             from >= std::numeric_limits<int32_t>::min()) {
395             m_copy.add_column(from);
396         } else if (items == 2) {
397             // calculate mean while avoiding overflows
398             int64_t const mean =
399                 (from / 2) + (to / 2) + ((from % 2 + to % 2) / 2);
400             if (mean <= std::numeric_limits<int32_t>::max() &&
401                 mean >= std::numeric_limits<int32_t>::min()) {
402                 m_copy.add_column(mean);
403             } else {
404                 m_copy.add_null_column();
405             }
406         } else {
407             m_copy.add_null_column();
408         }
409         break;
410     }
411     case ColumnType::REAL:
412         /* try to "repair" real values as follows:
413          * assume "," to be a decimal mark which need to be replaced by "."
414          * like int4 take the first number, or the average if it's a-b
415          * assume SI unit (meters)
416          * convert feet to meters (1 foot = 0.3048 meters)
417          * reject anything else
418          */
419         {
420             std::string escaped{value};
421             std::replace(escaped.begin(), escaped.end(), ',', '.');
422 
423             double from = NAN;
424             double to = NAN;
425             int const items =
426                 std::sscanf(escaped.c_str(), "%lf-%lf", &from, &to);
427             if (items == 1) {
428                 if (escaped.size() > 1 &&
429                     escaped.substr(escaped.size() - 2) == "ft") {
430                     from *= 0.3048;
431                 }
432                 m_copy.add_column(from);
433             } else if (items == 2) {
434                 if (escaped.size() > 1 &&
435                     escaped.substr(escaped.size() - 2) == "ft") {
436                     from *= 0.3048;
437                     to *= 0.3048;
438                 }
439                 m_copy.add_column((from + to) / 2);
440             } else {
441                 m_copy.add_null_column();
442             }
443             break;
444         }
445     case ColumnType::TEXT:
446         m_copy.add_column(value);
447         break;
448     }
449 }
450 
get_wkb(osmid_t id)451 pg_result_t table_t::get_wkb(osmid_t id)
452 {
453     return m_sql_conn->exec_prepared("get_wkb", id);
454 }
455 
456