1 /**
2 * SPDX-License-Identifier: GPL-2.0-or-later
3 *
4 * This file is part of osm2pgsql (https://osm2pgsql.org/).
5 *
6 * Copyright (C) 2006-2021 by the osm2pgsql developer community.
7 * For a full list of authors see the git log.
8 */
9
10 #include <algorithm>
11 #include <cstdint>
12 #include <cstdio>
13 #include <limits>
14 #include <stdexcept>
15 #include <string>
16 #include <vector>
17
18 #include "format.hpp"
19 #include "logging.hpp"
20 #include "options.hpp"
21 #include "pgsql-helper.hpp"
22 #include "table.hpp"
23 #include "taginfo.hpp"
24 #include "util.hpp"
25
table_t(std::string const & name,std::string const & type,columns_t const & columns,hstores_t const & hstore_columns,int const srid,bool const append,hstore_column hstore_mode,std::shared_ptr<db_copy_thread_t> const & copy_thread,std::string const & schema)26 table_t::table_t(std::string const &name, std::string const &type,
27 columns_t const &columns, hstores_t const &hstore_columns,
28 int const srid, bool const append, hstore_column hstore_mode,
29 std::shared_ptr<db_copy_thread_t> const ©_thread,
30 std::string const &schema)
31 : m_target(std::make_shared<db_target_descr_t>(name.c_str(), "osm_id")),
32 m_type(type), m_srid(fmt::to_string(srid)), m_append(append),
33 m_hstore_mode(hstore_mode), m_columns(columns),
34 m_hstore_columns(hstore_columns), m_copy(copy_thread)
35 {
36 m_target->schema = schema;
37
38 // if we dont have any columns
39 if (m_columns.empty() && m_hstore_mode != hstore_column::all) {
40 throw std::runtime_error{
41 "No columns provided for table {}."_format(name)};
42 }
43
44 generate_copy_column_list();
45 }
46
table_t(table_t const & other,std::shared_ptr<db_copy_thread_t> const & copy_thread)47 table_t::table_t(table_t const &other,
48 std::shared_ptr<db_copy_thread_t> const ©_thread)
49 : m_conninfo(other.m_conninfo), m_target(other.m_target), m_type(other.m_type),
50 m_srid(other.m_srid), m_append(other.m_append),
51 m_hstore_mode(other.m_hstore_mode), m_columns(other.m_columns),
52 m_hstore_columns(other.m_hstore_columns), m_table_space(other.m_table_space),
53 m_copy(copy_thread)
54 {
55 // if the other table has already started, then we want to execute
56 // the same stuff to get into the same state. but if it hasn't, then
57 // this would be premature.
58 if (other.m_sql_conn) {
59 connect();
60 prepare();
61 }
62 }
63
teardown()64 void table_t::teardown() { m_sql_conn.reset(); }
65
sync()66 void table_t::sync() { m_copy.sync(); }
67
connect()68 void table_t::connect()
69 {
70 m_sql_conn = std::make_unique<pg_conn_t>(m_conninfo);
71 //let commits happen faster by delaying when they actually occur
72 m_sql_conn->exec("SET synchronous_commit = off");
73 }
74
start(std::string const & conninfo,std::string const & table_space)75 void table_t::start(std::string const &conninfo, std::string const &table_space)
76 {
77 if (m_sql_conn) {
78 throw std::runtime_error{m_target->name +
79 " cannot start, its already started."};
80 }
81
82 m_conninfo = conninfo;
83 m_table_space = tablespace_clause(table_space);
84
85 connect();
86 log_info("Setting up table '{}'", m_target->name);
87 m_sql_conn->exec("SET client_min_messages = WARNING");
88 auto const qual_name = qualified_name(m_target->schema, m_target->name);
89 auto const qual_tmp_name = qualified_name(
90 m_target->schema, m_target->name + "_tmp");
91
92 // we are making a new table
93 if (!m_append) {
94 m_sql_conn->exec(
95 "DROP TABLE IF EXISTS {} CASCADE"_format(qual_name));
96 }
97
98 // These _tmp tables can be left behind if we run out of disk space.
99 m_sql_conn->exec("DROP TABLE IF EXISTS {}"_format(qual_tmp_name));
100 m_sql_conn->exec("RESET client_min_messages");
101
102 //making a new table
103 if (!m_append) {
104 //define the new table
105 auto sql =
106 "CREATE UNLOGGED TABLE {} (osm_id int8,"_format(qual_name);
107
108 //first with the regular columns
109 for (auto const &column : m_columns) {
110 sql += "\"{}\" {},"_format(column.name, column.type_name);
111 }
112
113 //then with the hstore columns
114 for (auto const &hcolumn : m_hstore_columns) {
115 sql += "\"{}\" hstore,"_format(hcolumn);
116 }
117
118 //add tags column
119 if (m_hstore_mode != hstore_column::none) {
120 sql += "\"tags\" hstore,";
121 }
122
123 sql += "way geometry({},{}) )"_format(m_type, m_srid);
124
125 // The final tables are created with CREATE TABLE AS ... SELECT * FROM ...
126 // This means that they won't get this autovacuum setting, so it doesn't
127 // doesn't need to be RESET on these tables
128 sql += " WITH (autovacuum_enabled = off)";
129 //add the main table space
130 sql += m_table_space;
131
132 //create the table
133 m_sql_conn->exec(sql);
134
135 if (m_srid != "4326") {
136 create_geom_check_trigger(m_sql_conn.get(), m_target->schema,
137 m_target->name, "way");
138 }
139 }
140
141 prepare();
142 }
143
prepare()144 void table_t::prepare()
145 {
146 //let postgres cache this query as it will presumably happen a lot
147 auto const qual_name = qualified_name(m_target->schema, m_target->name);
148 m_sql_conn->exec(
149 "PREPARE get_wkb(int8) AS SELECT way FROM {} WHERE osm_id = $1"_format(
150 qual_name));
151 }
152
generate_copy_column_list()153 void table_t::generate_copy_column_list()
154 {
155 m_target->rows = "osm_id,";
156 //first with the regular columns
157 for (auto const &column : m_columns) {
158 m_target->rows += '"';
159 m_target->rows += column.name;
160 m_target->rows += "\",";
161 }
162
163 //then with the hstore columns
164 for (auto const &hcolumn : m_hstore_columns) {
165 m_target->rows += '"';
166 m_target->rows += hcolumn;
167 m_target->rows += "\",";
168 }
169
170 //add tags column and geom column
171 if (m_hstore_mode != hstore_column::none) {
172 m_target->rows += "tags,way";
173 //or just the geom column
174 } else {
175 m_target->rows += "way";
176 }
177 }
178
stop(bool updateable,bool enable_hstore_index,std::string const & table_space_index)179 void table_t::stop(bool updateable, bool enable_hstore_index,
180 std::string const &table_space_index)
181 {
182 // make sure that all data is written to the DB before continuing
183 m_copy.sync();
184
185 auto const qual_name = qualified_name(m_target->schema, m_target->name);
186 auto const qual_tmp_name = qualified_name(
187 m_target->schema, m_target->name + "_tmp");
188
189 if (!m_append) {
190 if (m_srid != "4326") {
191 drop_geom_check_trigger(m_sql_conn.get(), m_target->schema,
192 m_target->name);
193 }
194
195 log_info("Clustering table '{}' by geometry...", m_target->name);
196
197 // Notices about invalid geometries are expected and can be ignored
198 // because they say nothing about the validity of the geometry in OSM.
199 m_sql_conn->exec("SET client_min_messages = WARNING");
200
201 std::string sql =
202 "CREATE TABLE {} {} AS SELECT * FROM {}"_format(
203 qual_tmp_name, m_table_space, qual_name);
204
205 auto const postgis_version = get_postgis_version(*m_sql_conn);
206
207 sql += " ORDER BY ";
208 if (postgis_version.major == 2 && postgis_version.minor < 4) {
209 log_debug("Using GeoHash for clustering table '{}'",
210 m_target->name);
211 if (m_srid == "4326") {
212 sql += "ST_GeoHash(way,10)";
213 } else {
214 sql += "ST_GeoHash(ST_Transform(ST_Envelope(way),4326),10)";
215 }
216 sql += " COLLATE \"C\"";
217 } else {
218 log_debug("Using native order for clustering table '{}'",
219 m_target->name);
220 // Since Postgis 2.4 the order function for geometries gives
221 // useful results.
222 sql += "way";
223 }
224
225 m_sql_conn->exec(sql);
226
227 m_sql_conn->exec("DROP TABLE {}"_format(qual_name));
228 m_sql_conn->exec("ALTER TABLE {} RENAME TO \"{}\""_format(
229 qual_tmp_name, m_target->name));
230
231 log_info("Creating geometry index on table '{}'...", m_target->name);
232
233 // Use fillfactor 100 for un-updatable imports
234 m_sql_conn->exec("CREATE INDEX ON {} USING GIST (way) {} {}"_format(
235 qual_name, (updateable ? "" : "WITH (fillfactor = 100)"),
236 tablespace_clause(table_space_index)));
237
238 /* slim mode needs this to be able to apply diffs */
239 if (updateable) {
240 log_info("Creating osm_id index on table '{}'...", m_target->name);
241 m_sql_conn->exec(
242 "CREATE INDEX ON {} USING BTREE (osm_id) {}"_format(
243 qual_name, tablespace_clause(table_space_index)));
244 if (m_srid != "4326") {
245 create_geom_check_trigger(m_sql_conn.get(), m_target->schema,
246 m_target->name, "way");
247 }
248 }
249
250 /* Create hstore index if selected */
251 if (enable_hstore_index) {
252 log_info("Creating hstore indexes on table '{}'...",
253 m_target->name);
254 if (m_hstore_mode != hstore_column::none) {
255 m_sql_conn->exec(
256 "CREATE INDEX ON {} USING GIN (tags) {}"_format(
257 qual_name, tablespace_clause(table_space_index)));
258 }
259 for (auto const &hcolumn : m_hstore_columns) {
260 m_sql_conn->exec(
261 "CREATE INDEX ON {} USING GIN (\"{}\") {}"_format(
262 qual_name, hcolumn,
263 tablespace_clause(table_space_index)));
264 }
265 }
266 log_info("Analyzing table '{}'...", m_target->name);
267 analyze_table(*m_sql_conn, m_target->schema, m_target->name);
268 }
269 teardown();
270 }
271
delete_row(osmid_t const id)272 void table_t::delete_row(osmid_t const id)
273 {
274 m_copy.new_line(m_target);
275 m_copy.delete_object(id);
276 }
277
write_row(osmid_t id,taglist_t const & tags,std::string const & geom)278 void table_t::write_row(osmid_t id, taglist_t const &tags,
279 std::string const &geom)
280 {
281 m_copy.new_line(m_target);
282
283 //add the osm id
284 m_copy.add_column(id);
285
286 // used to remember which columns have been written out already.
287 std::vector<bool> used;
288
289 if (m_hstore_mode != hstore_column::none) {
290 used.assign(tags.size(), false);
291 }
292
293 //get the regular columns' values
294 write_columns(tags, m_hstore_mode == hstore_column::norm ? &used : nullptr);
295
296 //get the hstore columns' values
297 write_hstore_columns(tags);
298
299 //get the key value pairs for the tags column
300 if (m_hstore_mode != hstore_column::none) {
301 write_tags_column(tags, used);
302 }
303
304 //add the geometry - encoding it to hex along the way
305 m_copy.add_hex_geom(geom);
306
307 //send all the data to postgres
308 m_copy.finish_line();
309 }
310
write_columns(taglist_t const & tags,std::vector<bool> * used)311 void table_t::write_columns(taglist_t const &tags, std::vector<bool> *used)
312 {
313 for (auto const &column : m_columns) {
314 std::size_t const idx = tags.indexof(column.name);
315 if (idx != std::numeric_limits<std::size_t>::max()) {
316 escape_type(tags[idx].value, column.type);
317
318 // Remember we already used this one so we can't use
319 // again later in the hstore column.
320 if (used) {
321 (*used)[idx] = true;
322 }
323 } else {
324 m_copy.add_null_column();
325 }
326 }
327 }
328
329 /// Write all tags to hstore. Exclude tags written to other columns and z_order.
write_tags_column(taglist_t const & tags,std::vector<bool> const & used)330 void table_t::write_tags_column(taglist_t const &tags,
331 std::vector<bool> const &used)
332 {
333 m_copy.new_hash();
334
335 for (std::size_t i = 0; i < tags.size(); ++i) {
336 tag_t const &tag = tags[i];
337 if (!used[i] && (tag.key != "z_order")) {
338 m_copy.add_hash_elem(tag.key, tag.value);
339 }
340 }
341
342 m_copy.finish_hash();
343 }
344
345 /* write an hstore column to the database */
write_hstore_columns(taglist_t const & tags)346 void table_t::write_hstore_columns(taglist_t const &tags)
347 {
348 for (auto const &hcolumn : m_hstore_columns) {
349 bool added = false;
350
351 for (auto const &tag : tags) {
352 //check if the tag's key starts with the name of the hstore column
353 if (tag.key.compare(0, hcolumn.size(), hcolumn) == 0) {
354 char const *const shortkey = &tag.key[hcolumn.size()];
355
356 //and pack the shortkey with its value into the hstore
357 //hstore ASCII representation looks like "key"=>"value"
358 if (!added) {
359 added = true;
360 m_copy.new_hash();
361 }
362
363 m_copy.add_hash_elem(shortkey, tag.value.c_str());
364 }
365 }
366
367 if (added) {
368 m_copy.finish_hash();
369 } else {
370 m_copy.add_null_column();
371 }
372 }
373 }
374
task_wait()375 void table_t::task_wait()
376 {
377 auto const run_time = m_task_result.wait();
378 log_info("All postprocessing on table '{}' done in {}.", m_target->name,
379 util::human_readable_duration(run_time));
380 }
381
382 /* Escape data appropriate to the type */
escape_type(std::string const & value,ColumnType flags)383 void table_t::escape_type(std::string const &value, ColumnType flags)
384 {
385 switch (flags) {
386 case ColumnType::INT: {
387 // For integers we take the first number, or the average if it's a-b
388 long long from = 0;
389 long long to = 0;
390 // limit number of digits parsed to avoid undefined behaviour in sscanf
391 int const items =
392 std::sscanf(value.c_str(), "%18lld-%18lld", &from, &to);
393 if (items == 1 && from <= std::numeric_limits<int32_t>::max() &&
394 from >= std::numeric_limits<int32_t>::min()) {
395 m_copy.add_column(from);
396 } else if (items == 2) {
397 // calculate mean while avoiding overflows
398 int64_t const mean =
399 (from / 2) + (to / 2) + ((from % 2 + to % 2) / 2);
400 if (mean <= std::numeric_limits<int32_t>::max() &&
401 mean >= std::numeric_limits<int32_t>::min()) {
402 m_copy.add_column(mean);
403 } else {
404 m_copy.add_null_column();
405 }
406 } else {
407 m_copy.add_null_column();
408 }
409 break;
410 }
411 case ColumnType::REAL:
412 /* try to "repair" real values as follows:
413 * assume "," to be a decimal mark which need to be replaced by "."
414 * like int4 take the first number, or the average if it's a-b
415 * assume SI unit (meters)
416 * convert feet to meters (1 foot = 0.3048 meters)
417 * reject anything else
418 */
419 {
420 std::string escaped{value};
421 std::replace(escaped.begin(), escaped.end(), ',', '.');
422
423 double from = NAN;
424 double to = NAN;
425 int const items =
426 std::sscanf(escaped.c_str(), "%lf-%lf", &from, &to);
427 if (items == 1) {
428 if (escaped.size() > 1 &&
429 escaped.substr(escaped.size() - 2) == "ft") {
430 from *= 0.3048;
431 }
432 m_copy.add_column(from);
433 } else if (items == 2) {
434 if (escaped.size() > 1 &&
435 escaped.substr(escaped.size() - 2) == "ft") {
436 from *= 0.3048;
437 to *= 0.3048;
438 }
439 m_copy.add_column((from + to) / 2);
440 } else {
441 m_copy.add_null_column();
442 }
443 break;
444 }
445 case ColumnType::TEXT:
446 m_copy.add_column(value);
447 break;
448 }
449 }
450
get_wkb(osmid_t id)451 pg_result_t table_t::get_wkb(osmid_t id)
452 {
453 return m_sql_conn->exec_prepared("get_wkb", id);
454 }
455
456