1/* Part of SWI-Prolog 2 3 Author: Jan Wielemaker 4 E-mail: J.Wielemaker@vu.nl 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2006-2015, University of Amsterdam 7 VU University Amsterdam 8 All rights reserved. 9 10 Redistribution and use in source and binary forms, with or without 11 modification, are permitted provided that the following conditions 12 are met: 13 14 1. Redistributions of source code must retain the above copyright 15 notice, this list of conditions and the following disclaimer. 16 17 2. Redistributions in binary form must reproduce the above copyright 18 notice, this list of conditions and the following disclaimer in 19 the documentation and/or other materials provided with the 20 distribution. 21 22 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 30 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 32 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 POSSIBILITY OF SUCH DAMAGE. 34*/ 35 36:- module(rdf_persistency, 37 [ rdf_attach_db/2, % +Directory, +Options 38 rdf_detach_db/0, % +Detach current Graph 39 rdf_current_db/1, % -Directory 40 rdf_persistency/2, % +Graph, +Bool 41 rdf_flush_journals/1, % +Options 42 rdf_persistency_property/1, % ?Property 43 rdf_journal_file/2, % ?Graph, ?JournalFile 44 rdf_snapshot_file/2, % ?Graph, ?SnapshotFile 45 rdf_db_to_file/2 % ?Graph, ?FileBase 46 ]). 47:- use_module(library(semweb/rdf_db), 48 [ rdf_graph/1, rdf_unload_graph/1, rdf_statistics/1, 49 rdf_load_db/1, rdf_retractall/4, rdf_create_graph/1, 50 rdf_assert/4, rdf_update/5, rdf_monitor/2, rdf/4, 51 rdf_save_db/2, rdf_atom_md5/3, rdf_current_ns/2, 52 rdf_register_ns/3 53 ]). 54 55:- autoload(library(apply),[maplist/2,maplist/3,partition/4,exclude/3]). 56:- autoload(library(debug),[debug/3]). 57:- autoload(library(error), 58 [permission_error/3,must_be/2,domain_error/2]). 59:- autoload(library(filesex), 60 [directory_file_path/3,make_directory_path/1]). 61:- autoload(library(lists),[select/3,append/3]). 62:- autoload(library(option),[option/2,option/3]). 63:- autoload(library(readutil),[read_file_to_terms/3]). 64:- autoload(library(socket),[gethostname/1]). 65:- autoload(library(thread),[concurrent/3]). 66:- autoload(library(uri),[uri_encoded/3]). 67 68/** <module> RDF persistency plugin 69 70This module provides persistency for rdf_db.pl based on the 71rdf_monitor/2 predicate to track changes to the repository. Where 72previous versions used autosafe of the whole database using the 73quick-load format of rdf_db, this version is based on a quick-load file 74per source (4th argument of rdf/4), and journalling for edit operations. 75 76The result is safe, avoids frequent small changes to large files which 77makes synchronisation and backup expensive and avoids long disruption of 78the server doing the autosafe. Only loading large files disrupts service 79for some time. 80 81The persistent backup of the database is realised in a directory, using 82a lock file to avoid corruption due to concurrent access. Each source is 83represented by two files, the latest snapshot and a journal. The state 84is restored by loading the snapshot and replaying the journal. The 85predicate rdf_flush_journals/1 can be used to create fresh snapshots and 86delete the journals. 87 88@tbd If there is a complete `.new' snapshot and no journal, we should 89 move the .new to the plain snapshot name as a means of recovery. 90 91@tbd Backup of each graph using one or two files is very costly if there 92 are many graphs. Although the currently used subdirectories avoid 93 hitting OS limits early, this is still not ideal. Probably we 94 should collect (small, older?) files and combine them into a single 95 quick load file. We could call this (similar to GIT) a `pack'. 96 97@see rdf_edit.pl 98*/ 99 100:- volatile 101 rdf_directory/1, 102 rdf_lock/2, 103 rdf_option/1, 104 source_journal_fd/2, 105 file_base_db/2. 106:- dynamic 107 rdf_directory/1, % Absolute path 108 rdf_lock/2, % Dir, Lock 109 rdf_option/1, % Defined options 110 source_journal_fd/2, % DB, JournalFD 111 file_base_db/2. % FileBase, DB 112 113:- meta_predicate 114 no_agc(0). 115 116:- predicate_options(rdf_attach_db/2, 2, 117 [ access(oneof([read_write,read_only])), 118 concurrency(positive_integer), 119 max_open_journals(positive_integer), 120 silent(oneof([true,false,brief])), 121 log_nested_transactions(boolean) 122 ]). 123 124%! rdf_attach_db(+Directory, +Options) is det. 125% 126% Start persistent operations using Directory as place to store 127% files. There are several cases: 128% 129% * Empty DB, existing directory 130% Load the DB from the existing directory 131% 132% * Full DB, empty directory 133% Create snapshots for all sources in directory 134% 135% Options: 136% 137% * access(+AccessMode) 138% One of =auto= (default), =read_write= or 139% =read_only=. Read-only access implies that the RDF 140% store is not locked. It is read at startup and all 141% modifications to the data are temporary. The default 142% =auto= mode is =read_write= if the directory is 143% writeable and the lock can be acquired. Otherwise 144% it reverts to =read_only=. 145% 146% * concurrency(+Jobs) 147% Number of threads to use for loading the initial 148% database. If not provided it is the number of CPUs 149% as optained from the flag =cpu_count=. 150% 151% * max_open_journals(+Count) 152% Maximum number of journals kept open. If not provided, 153% the default is 10. See limit_fd_pool/0. 154% 155% * directory_levels(+Count) 156% Number of levels of intermediate directories for storing 157% the graph files. Default is 2. 158% 159% * silent(+BoolOrBrief) 160% If =true= (default =false=), do not print informational 161% messages. Finally, if =brief= it will show minimal 162% feedback. 163% 164% * log_nested_transactions(+Boolean) 165% If =true=, nested _log_ transactions are added to the 166% journal information. By default (=false=), no log-term 167% is added for nested transactions.\\ 168% 169% @error existence_error(source_sink, Directory) 170% @error permission_error(write, directory, Directory) 171 172rdf_attach_db(DirSpec, Options) :- 173 option(access(read_only), Options), 174 !, 175 absolute_file_name(DirSpec, 176 Directory, 177 [ access(read), 178 file_type(directory) 179 ]), 180 rdf_attach_db_ro(Directory, Options). 181rdf_attach_db(DirSpec, Options) :- 182 option(access(read_write), Options), 183 !, 184 rdf_attach_db_rw(DirSpec, Options). 185rdf_attach_db(DirSpec, Options) :- 186 absolute_file_name(DirSpec, 187 Directory, 188 [ access(exist), 189 file_type(directory), 190 file_errors(fail) 191 ]), 192 !, 193 ( access_file(Directory, write) 194 -> catch(rdf_attach_db_rw(Directory, Options), E, true), 195 ( var(E) 196 -> true 197 ; E = error(permission_error(lock, rdf_db, _), _) 198 -> print_message(warning, E), 199 print_message(warning, rdf(read_only)), 200 rdf_attach_db(DirSpec, [access(read_only)|Options]) 201 ; throw(E) 202 ) 203 ; print_message(warning, 204 error(permission_error(write, directory, Directory))), 205 print_message(warning, rdf(read_only)), 206 rdf_attach_db_ro(Directory, Options) 207 ). 208rdf_attach_db(DirSpec, Options) :- 209 catch(rdf_attach_db_rw(DirSpec, Options), E, true), 210 ( var(E) 211 -> true 212 ; print_message(warning, E), 213 print_message(warning, rdf(read_only)), 214 rdf_attach_db(DirSpec, [access(read_only)|Options]) 215 ). 216 217 218rdf_attach_db_rw(DirSpec, Options) :- 219 absolute_file_name(DirSpec, 220 Directory, 221 [ access(write), 222 file_type(directory), 223 file_errors(fail) 224 ]), 225 !, 226 ( rdf_directory(Directory) 227 -> true % update settings? 228 ; rdf_detach_db, 229 mkdir(Directory), 230 lock_db(Directory), 231 assert(rdf_directory(Directory)), 232 assert_options(Options), 233 stop_monitor, % make sure not to register load 234 no_agc(load_db), 235 at_halt(rdf_detach_db), 236 start_monitor 237 ). 238rdf_attach_db_rw(DirSpec, Options) :- 239 absolute_file_name(DirSpec, 240 Directory, 241 [ solutions(all) 242 ]), 243 ( exists_directory(Directory) 244 -> access_file(Directory, write) 245 ; catch(make_directory(Directory), _, fail) 246 ), 247 !, 248 rdf_attach_db(Directory, Options). 249rdf_attach_db_rw(DirSpec, _) :- % Generate an existence or 250 absolute_file_name(DirSpec, % permission error 251 Directory, 252 [ access(exist), 253 file_type(directory) 254 ]), 255 permission_error(write, directory, Directory). 256 257%! rdf_attach_db_ro(+Directory, +Options) 258% 259% Open an RDF database in read-only mode. 260 261rdf_attach_db_ro(Directory, Options) :- 262 rdf_detach_db, 263 assert(rdf_directory(Directory)), 264 assert_options(Options), 265 stop_monitor, % make sure not to register load 266 no_agc(load_db). 267 268 269assert_options([]). 270assert_options([H|T]) :- 271 ( option_type(H, Check) 272 -> Check, 273 assert(rdf_option(H)) 274 ; true % ignore options we do not understand 275 ), 276 assert_options(T). 277 278option_type(concurrency(X), must_be(positive_integer, X)). 279option_type(max_open_journals(X), must_be(positive_integer, X)). 280option_type(directory_levels(X), must_be(positive_integer, X)). 281option_type(silent(X), must_be(oneof([true,false,brief]), X)). 282option_type(log_nested_transactions(X), must_be(boolean, X)). 283option_type(access(X), must_be(oneof([read_write, 284 read_only]), X)). 285 286 287%! rdf_persistency_property(?Property) is nondet. 288% 289% True when Property is a property of the current persistent database. 290% Exposes the properties that can be passed as options to 291% rdf_attach_db/2. Specifically, 292% rdf_persistency_property(access(read_only)) is true iff the database 293% is mounted in read-only mode. In addition, the following property is 294% supported: 295% 296% - directory(Dir) 297% The directory in which the database resides. 298 299rdf_persistency_property(Property) :- 300 var(Property), 301 !, 302 rdf_persistency_property_(Property). 303rdf_persistency_property(Property) :- 304 rdf_persistency_property_(Property), 305 !. 306 307rdf_persistency_property_(Property) :- 308 rdf_option(Property). 309rdf_persistency_property_(directory(Dir)) :- 310 rdf_directory(Dir). 311 312%! no_agc(:Goal) 313% 314% Run Goal with atom garbage collection disabled. Loading an RDF 315% database creates large amounts of atoms we *know* are not 316% garbage. 317 318no_agc(Goal) :- 319 current_prolog_flag(agc_margin, Old), 320 setup_call_cleanup( 321 set_prolog_flag(agc_margin, 0), 322 Goal, 323 set_prolog_flag(agc_margin, Old)). 324 325 326%! rdf_detach_db is det. 327% 328% Detach from the current database. Succeeds silently if no 329% database is attached. Normally called at the end of the program 330% through at_halt/1. 331 332rdf_detach_db :- 333 debug(halt, 'Detaching RDF database', []), 334 stop_monitor, 335 close_journals, 336 ( retract(rdf_directory(Dir)) 337 -> debug(halt, 'DB Directory: ~w', [Dir]), 338 save_prefixes(Dir), 339 retractall(rdf_option(_)), 340 retractall(source_journal_fd(_,_)), 341 unlock_db(Dir) 342 ; true 343 ). 344 345 346%! rdf_current_db(?Dir) 347% 348% True if Dir is the current RDF persistent database. 349 350rdf_current_db(Directory) :- 351 rdf_directory(Dir), 352 !, 353 Dir = Directory. 354 355 356%! rdf_flush_journals(+Options) 357% 358% Flush dirty journals. Options: 359% 360% * min_size(+KB) 361% Only flush if journal is over KB in size. 362% * graph(+Graph) 363% Only flush the journal of Graph 364% 365% @tbd Provide a default for min_size? 366 367rdf_flush_journals(Options) :- 368 option(graph(Graph), Options, _), 369 forall(rdf_graph(Graph), 370 rdf_flush_journal(Graph, Options)). 371 372rdf_flush_journal(Graph, Options) :- 373 db_files(Graph, _SnapshotFile, JournalFile), 374 db_file(JournalFile, File), 375 ( \+ exists_file(File) 376 -> true 377 ; memberchk(min_size(KB), Options), 378 size_file(JournalFile, Size), 379 Size / 1024 < KB 380 -> true 381 ; create_db(Graph) 382 ). 383 384 /******************************* 385 * LOAD * 386 *******************************/ 387 388%! load_db is det. 389% 390% Reload database from the directory specified by rdf_directory/1. 391% First we find all names graphs using find_dbs/1 and then we load 392% them. 393 394load_db :- 395 rdf_directory(Dir), 396 concurrency(Jobs), 397 cpu_stat_key(Jobs, StatKey), 398 get_time(Wall0), 399 statistics(StatKey, T0), 400 load_prefixes(Dir), 401 verbosity(Silent), 402 find_dbs(Dir, Graphs, SnapShots, Journals), 403 length(Graphs, GraphCount), 404 maplist(rdf_unload_graph, Graphs), 405 rdf_statistics(triples(Triples0)), 406 load_sources(snapshots, SnapShots, Silent, Jobs), 407 load_sources(journals, Journals, Silent, Jobs), 408 rdf_statistics(triples(Triples1)), 409 statistics(StatKey, T1), 410 get_time(Wall1), 411 T is T1 - T0, 412 Wall is Wall1 - Wall0, 413 Triples = Triples1 - Triples0, 414 message_level(Silent, Level), 415 print_message(Level, rdf(restore(attached(GraphCount, Triples, T/Wall)))). 416 417load_sources(_, [], _, _) :- !. 418load_sources(Type, Sources, Silent, Jobs) :- 419 length(Sources, Count), 420 RunJobs is min(Count, Jobs), 421 print_message(informational, rdf(restoring(Type, Count, RunJobs))), 422 make_goals(Sources, Silent, 1, Count, Goals), 423 concurrent(RunJobs, Goals, []). 424 425 426%! make_goals(+DBs, +Silent, +Index, +Total, -Goals) 427 428make_goals([], _, _, _, []). 429make_goals([DB|T0], Silent, I, Total, 430 [load_source(DB, Silent, I, Total)|T]) :- 431 I2 is I + 1, 432 make_goals(T0, Silent, I2, Total, T). 433 434verbosity(Silent) :- 435 rdf_option(silent(Silent)), 436 !. 437verbosity(Silent) :- 438 current_prolog_flag(verbose, silent), 439 !, 440 Silent = true. 441verbosity(brief). 442 443 444%! concurrency(-Jobs) 445% 446% Number of jobs to run concurrently. 447 448concurrency(Jobs) :- 449 rdf_option(concurrency(Jobs)), 450 !. 451concurrency(Jobs) :- 452 current_prolog_flag(cpu_count, Jobs), 453 Jobs > 0, 454 !. 455concurrency(1). 456 457cpu_stat_key(1, cputime) :- !. 458cpu_stat_key(_, process_cputime). 459 460 461%! find_dbs(+Dir, -Graphs, -SnapBySize, -JournalBySize) is det. 462% 463% Scan the persistent database and return a list of snapshots and 464% journals, both sorted by file-size. Each term is of the form 465% 466% == 467% db(Size, Ext, DB, DBFile, Depth) 468% == 469 470find_dbs(Dir, Graphs, SnapBySize, JournalBySize) :- 471 directory_files(Dir, Files), 472 phrase(scan_db_files(Files, Dir, '.', 0), Scanned), 473 maplist(db_graph, Scanned, UnsortedGraphs), 474 sort(UnsortedGraphs, Graphs), 475 ( consider_reindex_db(Dir, Graphs, Scanned) 476 -> find_dbs(Dir, Graphs, SnapBySize, JournalBySize) 477 ; partition(db_is_snapshot, Scanned, Snapshots, Journals), 478 sort(Snapshots, SnapBySize), 479 sort(Journals, JournalBySize) 480 ). 481 482consider_reindex_db(Dir, Graphs, Scanned) :- 483 length(Graphs, Count), 484 Count > 0, 485 DepthNeeded is floor(log(Count)/log(256)), 486 ( maplist(depth_db(DepthNow), Scanned) 487 -> ( DepthNeeded > DepthNow 488 -> true 489 ; retractall(rdf_option(directory_levels(_))), 490 assertz(rdf_option(directory_levels(DepthNow))), 491 fail 492 ) 493 ; true 494 ), 495 reindex_db(Dir, DepthNeeded). 496 497db_is_snapshot(Term) :- 498 arg(2, Term, trp). 499 500db_graph(Term, DB) :- 501 arg(3, Term, DB). 502 503db_file_name(Term, File) :- 504 arg(4, Term, File). 505 506depth_db(Depth, DB) :- 507 arg(5, DB, Depth). 508 509%! scan_db_files(+Files, +Dir, +Prefix, +Depth)// is det. 510% 511% Produces a list of db(DB, Size, File) for all recognised RDF 512% database files. File is relative to the database directory Dir. 513 514scan_db_files([], _, _, _) --> 515 []. 516scan_db_files([Nofollow|T], Dir, Prefix, Depth) --> 517 { nofollow(Nofollow) }, 518 !, 519 scan_db_files(T, Dir, Prefix, Depth). 520scan_db_files([File|T], Dir, Prefix, Depth) --> 521 { file_name_extension(Base, Ext, File), 522 db_extension(Ext), 523 !, 524 rdf_db_to_file(DB, Base), 525 directory_file_path(Prefix, File, DBFile), 526 directory_file_path(Dir, DBFile, AbsFile), 527 size_file(AbsFile, Size) 528 }, 529 [ db(Size, Ext, DB, AbsFile, Depth) ], 530 scan_db_files(T, Dir, Prefix, Depth). 531scan_db_files([D|T], Dir, Prefix, Depth) --> 532 { directory_file_path(Prefix, D, SubD), 533 directory_file_path(Dir, SubD, AbsD), 534 exists_directory(AbsD), 535 \+ read_link(AbsD, _, _), % Do not follow links 536 !, 537 directory_files(AbsD, SubFiles), 538 SubDepth is Depth + 1 539 }, 540 scan_db_files(SubFiles, Dir, SubD, SubDepth), 541 scan_db_files(T, Dir, Prefix, Depth). 542scan_db_files([_|T], Dir, Prefix, Depth) --> 543 scan_db_files(T, Dir, Prefix, Depth). 544 545nofollow(.). 546nofollow(..). 547 548db_extension(trp). 549db_extension(jrn). 550 551:- public load_source/4. % called through make_goals/5 552 553load_source(DB, Silent, Nth, Total) :- 554 db_file_name(DB, File), 555 db_graph(DB, Graph), 556 message_level(Silent, Level), 557 graph_triple_count(Graph, Count0), 558 statistics(cputime, T0), 559 ( db_is_snapshot(DB) 560 -> print_message(Level, rdf(restore(Silent, snapshot(Graph, File)))), 561 rdf_load_db(File) 562 ; print_message(Level, rdf(restore(Silent, journal(Graph, File)))), 563 load_journal(File, Graph) 564 ), 565 statistics(cputime, T1), 566 T is T1 - T0, 567 graph_triple_count(Graph, Count1), 568 Count is Count1 - Count0, 569 print_message(Level, rdf(restore(Silent, 570 done(Graph, T, Count, Nth, Total)))). 571 572 573graph_triple_count(Graph, Count) :- 574 rdf_statistics(triples_by_graph(Graph, Count)), 575 !. 576graph_triple_count(_, 0). 577 578 579%! attach_graph(+Graph, +Options) is det. 580% 581% Load triples and reload journal from the indicated snapshot 582% file. 583 584attach_graph(Graph, Options) :- 585 ( option(silent(true), Options) 586 -> Level = silent 587 ; Level = informational 588 ), 589 db_files(Graph, SnapshotFile, JournalFile), 590 rdf_retractall(_,_,_,Graph), 591 statistics(cputime, T0), 592 print_message(Level, rdf(restore(Silent, Graph))), 593 db_file(SnapshotFile, AbsSnapShot), 594 ( exists_file(AbsSnapShot) 595 -> print_message(Level, rdf(restore(Silent, snapshot(SnapshotFile)))), 596 rdf_load_db(AbsSnapShot) 597 ; true 598 ), 599 ( exists_db(JournalFile) 600 -> print_message(Level, rdf(restore(Silent, journal(JournalFile)))), 601 load_journal(JournalFile, Graph) 602 ; true 603 ), 604 statistics(cputime, T1), 605 T is T1 - T0, 606 ( rdf_statistics(triples_by_graph(Graph, Count)) 607 -> true 608 ; Count = 0 609 ), 610 print_message(Level, rdf(restore(Silent, 611 done(Graph, T, Count)))). 612 613message_level(true, silent) :- !. 614message_level(_, informational). 615 616 617 /******************************* 618 * LOAD JOURNAL * 619 *******************************/ 620 621%! load_journal(+File:atom, +DB:atom) is det. 622% 623% Process transactions from the RDF journal File, adding the given 624% named graph. 625 626load_journal(File, DB) :- 627 rdf_create_graph(DB), 628 setup_call_cleanup( 629 open(File, read, In, [encoding(utf8)]), 630 ( read(In, T0), 631 process_journal(T0, In, DB) 632 ), 633 close(In)). 634 635process_journal(end_of_file, _, _) :- !. 636process_journal(Term, In, DB) :- 637 ( process_journal_term(Term, DB) 638 -> true 639 ; throw(error(type_error(journal_term, Term), _)) 640 ), 641 read(In, T2), 642 process_journal(T2, In, DB). 643 644process_journal_term(assert(S,P,O), DB) :- 645 rdf_assert(S,P,O,DB). 646process_journal_term(assert(S,P,O,Line), DB) :- 647 rdf_assert(S,P,O,DB:Line). 648process_journal_term(retract(S,P,O), DB) :- 649 rdf_retractall(S,P,O,DB). 650process_journal_term(retract(S,P,O,Line), DB) :- 651 rdf_retractall(S,P,O,DB:Line). 652process_journal_term(update(S,P,O,Action), DB) :- 653 ( rdf_update(S,P,O,DB, Action) 654 -> true 655 ; print_message(warning, rdf(update_failed(S,P,O,Action))) 656 ). 657process_journal_term(start(_), _). % journal open/close 658process_journal_term(end(_), _). 659process_journal_term(begin(_), _). % logged transaction (compatibility) 660process_journal_term(end, _). 661process_journal_term(begin(_,_,_,_), _). % logged transaction (current) 662process_journal_term(end(_,_,_), _). 663 664 665 /******************************* 666 * CREATE JOURNAL * 667 *******************************/ 668 669:- dynamic 670 blocked_db/2, % DB, Reason 671 transaction_message/3, % Nesting, Time, Message 672 transaction_db/3. % Nesting, DB, Id 673 674%! rdf_persistency(+DB, Bool) 675% 676% Specify whether a database is persistent. Switching to =false= 677% kills the persistent state. Switching to =true= creates it. 678 679rdf_persistency(DB, Bool) :- 680 must_be(atom, DB), 681 must_be(boolean, Bool), 682 fail. 683rdf_persistency(DB, false) :- 684 !, 685 ( blocked_db(DB, persistency) 686 -> true 687 ; assert(blocked_db(DB, persistency)), 688 delete_db(DB) 689 ). 690rdf_persistency(DB, true) :- 691 ( retract(blocked_db(DB, persistency)) 692 -> create_db(DB) 693 ; true 694 ). 695 696%! rdf_db:property_of_graph(?Property, +Graph) is nondet. 697% 698% Extend rdf_graph_property/2 with new properties. 699 700:- multifile 701 rdf_db:property_of_graph/2. 702 703rdf_db:property_of_graph(persistent(State), Graph) :- 704 ( blocked_db(Graph, persistency) 705 -> State = false 706 ; State = true 707 ). 708 709 710%! start_monitor is det. 711%! stop_monitor is det. 712% 713% Start/stop monitoring the RDF database for changes and update 714% the journal. 715 716start_monitor :- 717 rdf_monitor(monitor, 718 [ -assert(load) 719 ]). 720stop_monitor :- 721 rdf_monitor(monitor, 722 [ -all 723 ]). 724 725%! monitor(+Term) is semidet. 726% 727% Handle an rdf_monitor/2 callback to deal with persistency. Note 728% that the monitor calls that come from rdf_db.pl that deal with 729% database changes are serialized. They do come from different 730% threads though. 731 732monitor(Msg) :- 733 debug(monitor, 'Monitor: ~p~n', [Msg]), 734 fail. 735monitor(assert(S,P,O,DB:Line)) :- 736 !, 737 \+ blocked_db(DB, _), 738 journal_fd(DB, Fd), 739 open_transaction(DB, Fd), 740 format(Fd, '~q.~n', [assert(S,P,O,Line)]), 741 sync_journal(DB, Fd). 742monitor(assert(S,P,O,DB)) :- 743 \+ blocked_db(DB, _), 744 journal_fd(DB, Fd), 745 open_transaction(DB, Fd), 746 format(Fd, '~q.~n', [assert(S,P,O)]), 747 sync_journal(DB, Fd). 748monitor(retract(S,P,O,DB:Line)) :- 749 !, 750 \+ blocked_db(DB, _), 751 journal_fd(DB, Fd), 752 open_transaction(DB, Fd), 753 format(Fd, '~q.~n', [retract(S,P,O,Line)]), 754 sync_journal(DB, Fd). 755monitor(retract(S,P,O,DB)) :- 756 \+ blocked_db(DB, _), 757 journal_fd(DB, Fd), 758 open_transaction(DB, Fd), 759 format(Fd, '~q.~n', [retract(S,P,O)]), 760 sync_journal(DB, Fd). 761monitor(update(S,P,O,DB:Line,Action)) :- 762 !, 763 \+ blocked_db(DB, _), 764 ( Action = graph(NewDB) 765 -> monitor(assert(S,P,O,NewDB)), 766 monitor(retract(S,P,O,DB:Line)) 767 ; journal_fd(DB, Fd), 768 format(Fd, '~q.~n', [update(S,P,O,Action)]), 769 sync_journal(DB, Fd) 770 ). 771monitor(update(S,P,O,DB,Action)) :- 772 \+ blocked_db(DB, _), 773 ( Action = graph(NewDB) 774 -> monitor(assert(S,P,O,NewDB)), 775 monitor(retract(S,P,O,DB)) 776 ; journal_fd(DB, Fd), 777 open_transaction(DB, Fd), 778 format(Fd, '~q.~n', [update(S,P,O,Action)]), 779 sync_journal(DB, Fd) 780 ). 781monitor(load(BE, _DumpFileURI)) :- 782 ( BE = end(Graphs) 783 -> sync_loaded_graphs(Graphs) 784 ; true 785 ). 786monitor(create_graph(Graph)) :- 787 \+ blocked_db(Graph, _), 788 journal_fd(Graph, Fd), 789 open_transaction(Graph, Fd), 790 sync_journal(Graph, Fd). 791monitor(reset) :- 792 forall(rdf_graph(Graph), delete_db(Graph)). 793 % TBD: Remove empty directories? 794 795monitor(transaction(BE, Id)) :- 796 monitor_transaction(Id, BE). 797 798monitor_transaction(load_journal(DB), begin(_)) :- 799 !, 800 assert(blocked_db(DB, journal)). 801monitor_transaction(load_journal(DB), end(_)) :- 802 !, 803 retractall(blocked_db(DB, journal)). 804 805monitor_transaction(parse(URI), begin(_)) :- 806 !, 807 ( blocked_db(URI, persistency) 808 -> true 809 ; assert(blocked_db(URI, parse)) 810 ). 811monitor_transaction(parse(URI), end(_)) :- 812 !, 813 ( retract(blocked_db(URI, parse)) 814 -> create_db(URI) 815 ; true 816 ). 817monitor_transaction(unload(DB), begin(_)) :- 818 !, 819 ( blocked_db(DB, persistency) 820 -> true 821 ; assert(blocked_db(DB, unload)) 822 ). 823monitor_transaction(unload(DB), end(_)) :- 824 !, 825 ( retract(blocked_db(DB, unload)) 826 -> delete_db(DB) 827 ; true 828 ). 829monitor_transaction(log(Msg), begin(N)) :- 830 !, 831 check_nested(N), 832 get_time(Time), 833 asserta(transaction_message(N, Time, Msg)). 834monitor_transaction(log(_), end(N)) :- 835 check_nested(N), 836 retract(transaction_message(N, _, _)), 837 !, 838 findall(DB:Id, retract(transaction_db(N, DB, Id)), DBs), 839 end_transactions(DBs, N). 840monitor_transaction(log(Msg, DB), begin(N)) :- 841 !, 842 check_nested(N), 843 get_time(Time), 844 asserta(transaction_message(N, Time, Msg)), 845 journal_fd(DB, Fd), 846 open_transaction(DB, Fd). 847monitor_transaction(log(Msg, _DB), end(N)) :- 848 monitor_transaction(log(Msg), end(N)). 849 850 851%! check_nested(+Level) is semidet. 852% 853% True if we must log this transaction. This is always the case 854% for toplevel transactions. Nested transactions are only logged 855% if log_nested_transactions(true) is defined. 856 857check_nested(0) :- !. 858check_nested(_) :- 859 rdf_option(log_nested_transactions(true)). 860 861 862%! open_transaction(+DB, +Fd) is det. 863% 864% Add a begin(Id, Level, Time, Message) term if a transaction 865% involves DB. Id is an incremental integer, where each database 866% has its own counter. Level is the nesting level, Time a floating 867% point timestamp and Message te message provided as argument to 868% the log message. 869 870open_transaction(DB, Fd) :- 871 transaction_message(N, Time, Msg), 872 !, 873 ( transaction_db(N, DB, _) 874 -> true 875 ; next_transaction_id(DB, Id), 876 assert(transaction_db(N, DB, Id)), 877 RoundedTime is round(Time*100)/100, 878 format(Fd, '~q.~n', [begin(Id, N, RoundedTime, Msg)]) 879 ). 880open_transaction(_,_). 881 882 883%! next_transaction_id(+DB, -Id) is det. 884% 885% Id is the number to user for the next logged transaction on DB. 886% Transactions in each named graph are numbered in sequence. 887% Searching the Id of the last transaction is performed by the 2nd 888% clause starting 1Kb from the end and doubling this offset each 889% failure. 890 891:- dynamic 892 current_transaction_id/2. 893 894next_transaction_id(DB, Id) :- 895 retract(current_transaction_id(DB, Last)), 896 !, 897 Id is Last + 1, 898 assert(current_transaction_id(DB, Id)). 899next_transaction_id(DB, Id) :- 900 db_files(DB, _, Journal), 901 exists_file(Journal), 902 !, 903 size_file(Journal, Size), 904 open_db(Journal, read, In, []), 905 call_cleanup(iterative_expand(In, Size, Last), close(In)), 906 Id is Last + 1, 907 assert(current_transaction_id(DB, Id)). 908next_transaction_id(DB, 1) :- 909 assert(current_transaction_id(DB, 1)). 910 911iterative_expand(_, 0, 0) :- !. 912iterative_expand(In, Size, Last) :- % Scan growing sections from the end 913 Max is floor(log(Size)/log(2)), 914 between(10, Max, Step), 915 Offset is -(1<<Step), 916 seek(In, Offset, eof, _), 917 skip(In, 10), % records are line-based 918 read(In, T0), 919 last_transaction_id(T0, In, 0, Last), 920 Last > 0, 921 !. 922iterative_expand(In, _, Last) :- % Scan the whole file 923 seek(In, 0, bof, _), 924 read(In, T0), 925 last_transaction_id(T0, In, 0, Last). 926 927last_transaction_id(end_of_file, _, Last, Last) :- !. 928last_transaction_id(end(Id, _, _), In, _, Last) :- 929 read(In, T1), 930 last_transaction_id(T1, In, Id, Last). 931last_transaction_id(_, In, Id, Last) :- 932 read(In, T1), 933 last_transaction_id(T1, In, Id, Last). 934 935 936%! end_transactions(+DBs:list(atom:id)) is det. 937% 938% End a transaction that affected the given list of databases. We 939% write the list of other affected databases as an argument to the 940% end-term to facilitate fast finding of the related transactions. 941% 942% In each database, the transaction is ended with a term end(Id, 943% Nesting, Others), where Id and Nesting are the transaction 944% identifier and nesting (see open_transaction/2) and Others is a 945% list of DB:Id, indicating other databases affected by the 946% transaction. 947 948end_transactions(DBs, N) :- 949 end_transactions(DBs, DBs, N). 950 951end_transactions([], _, _). 952end_transactions([DB:Id|T], DBs, N) :- 953 journal_fd(DB, Fd), 954 once(select(DB:Id, DBs, Others)), 955 format(Fd, 'end(~q, ~q, ~q).~n', [Id, N, Others]), 956 sync_journal(DB, Fd), 957 end_transactions(T, DBs, N). 958 959 960%! sync_loaded_graphs(+Graphs) 961% 962% Called after a binary triple has been loaded that added triples 963% to the given graphs. 964 965sync_loaded_graphs(Graphs) :- 966 maplist(create_db, Graphs). 967 968 969 /******************************* 970 * JOURNAL FILES * 971 *******************************/ 972 973%! journal_fd(+DB, -Stream) is det. 974% 975% Get an open stream to a journal. If the journal is not open, old 976% journals are closed to satisfy the =max_open_journals= option. 977% Then the journal is opened in =append= mode. Journal files are 978% always encoded as UTF-8 for portability as well as to ensure 979% full coverage of Unicode. 980 981journal_fd(DB, Fd) :- 982 source_journal_fd(DB, Fd), 983 !. 984journal_fd(DB, Fd) :- 985 with_mutex(rdf_journal_file, 986 journal_fd_(DB, Out)), 987 Fd = Out. 988 989journal_fd_(DB, Fd) :- 990 source_journal_fd(DB, Fd), 991 !. 992journal_fd_(DB, Fd) :- 993 limit_fd_pool, 994 db_files(DB, _Snapshot, Journal), 995 open_db(Journal, append, Fd, 996 [ close_on_abort(false) 997 ]), 998 time_stamp(Now), 999 format(Fd, '~q.~n', [start([time(Now)])]), 1000 assert(source_journal_fd(DB, Fd)). % new one at the end 1001 1002%! limit_fd_pool is det. 1003% 1004% Limit the number of open journals to max_open_journals (10). 1005% Note that calls from rdf_monitor/2 are issued in different 1006% threads, but as they are part of write operations they are fully 1007% synchronised. 1008 1009limit_fd_pool :- 1010 predicate_property(source_journal_fd(_, _), number_of_clauses(N)), 1011 !, 1012 ( rdf_option(max_open_journals(Max)) 1013 -> true 1014 ; Max = 10 1015 ), 1016 Close is N - Max, 1017 forall(between(1, Close, _), 1018 close_oldest_journal). 1019limit_fd_pool. 1020 1021close_oldest_journal :- 1022 source_journal_fd(DB, _Fd), 1023 !, 1024 debug(rdf_persistency, 'Closing old journal for ~q', [DB]), 1025 close_journal(DB). 1026close_oldest_journal. 1027 1028 1029%! sync_journal(+DB, +Fd) 1030% 1031% Sync journal represented by database and stream. If the DB is 1032% involved in a transaction there is no point flushing until the 1033% end of the transaction. 1034 1035sync_journal(DB, _) :- 1036 transaction_db(_, DB, _), 1037 !. 1038sync_journal(_, Fd) :- 1039 flush_output(Fd). 1040 1041%! close_journal(+DB) is det. 1042% 1043% Close the journal associated with DB if it is open. 1044 1045close_journal(DB) :- 1046 with_mutex(rdf_journal_file, 1047 close_journal_(DB)). 1048 1049close_journal_(DB) :- 1050 ( retract(source_journal_fd(DB, Fd)) 1051 -> time_stamp(Now), 1052 format(Fd, '~q.~n', [end([time(Now)])]), 1053 close(Fd, [force(true)]) 1054 ; true 1055 ). 1056 1057%! close_journals 1058% 1059% Close all open journals. 1060 1061close_journals :- 1062 forall(source_journal_fd(DB, _), 1063 catch(close_journal(DB), E, 1064 print_message(error, E))). 1065 1066%! create_db(+Graph) 1067% 1068% Create a saved version of Graph in corresponding file, close and 1069% delete journals. 1070 1071create_db(Graph) :- 1072 \+ rdf(_,_,_,Graph), 1073 !, 1074 debug(rdf_persistency, 'Deleting empty Graph ~w', [Graph]), 1075 delete_db(Graph). 1076create_db(Graph) :- 1077 debug(rdf_persistency, 'Saving Graph ~w', [Graph]), 1078 close_journal(Graph), 1079 db_abs_files(Graph, Snapshot, Journal), 1080 atom_concat(Snapshot, '.new', NewSnapshot), 1081 ( catch(( create_directory_levels(Snapshot), 1082 rdf_save_db(NewSnapshot, Graph) 1083 ), Error, 1084 ( print_message(warning, Error), 1085 fail 1086 )) 1087 -> ( exists_file(Journal) 1088 -> delete_file(Journal) 1089 ; true 1090 ), 1091 rename_file(NewSnapshot, Snapshot), 1092 debug(rdf_persistency, 'Saved Graph ~w', [Graph]) 1093 ; catch(delete_file(NewSnapshot), _, true) 1094 ). 1095 1096 1097%! delete_db(+DB) 1098% 1099% Remove snapshot and journal file for DB. 1100 1101delete_db(DB) :- 1102 with_mutex(rdf_journal_file, 1103 delete_db_(DB)). 1104 1105delete_db_(DB) :- 1106 close_journal_(DB), 1107 db_abs_files(DB, Snapshot, Journal), 1108 !, 1109 ( exists_file(Journal) 1110 -> delete_file(Journal) 1111 ; true 1112 ), 1113 ( exists_file(Snapshot) 1114 -> delete_file(Snapshot) 1115 ; true 1116 ). 1117delete_db_(_). 1118 1119 /******************************* 1120 * LOCKING * 1121 *******************************/ 1122 1123%! lock_db(+Dir) 1124% 1125% Lock the database directory Dir. 1126 1127lock_db(Dir) :- 1128 lockfile(Dir, File), 1129 catch(open(File, update, Out, [lock(write), wait(false)]), 1130 error(permission_error(Access, _, _), _), 1131 locked_error(Access, Dir)), 1132 ( current_prolog_flag(pid, PID) 1133 -> true 1134 ; PID = 0 % TBD: Fix in Prolog 1135 ), 1136 time_stamp(Now), 1137 gethostname(Host), 1138 format(Out, '/* RDF Database is in use */~n~n', []), 1139 format(Out, '~q.~n', [ locked([ time(Now), 1140 pid(PID), 1141 host(Host) 1142 ]) 1143 ]), 1144 flush_output(Out), 1145 set_end_of_stream(Out), 1146 assert(rdf_lock(Dir, lock(Out, File))), 1147 at_halt(unlock_db(Dir)). 1148 1149locked_error(lock, Dir) :- 1150 lockfile(Dir, File), 1151 ( catch(read_file_to_terms(File, Terms, []), _, fail), 1152 Terms = [locked(Args)] 1153 -> Context = rdf_locked(Args) 1154 ; Context = context(_, 'Database is in use') 1155 ), 1156 throw(error(permission_error(lock, rdf_db, Dir), Context)). 1157locked_error(open, Dir) :- 1158 throw(error(permission_error(lock, rdf_db, Dir), 1159 context(_, 'Lock file cannot be opened'))). 1160 1161%! unlock_db(+Dir) is det. 1162%! unlock_db(+Stream, +File) is det. 1163 1164unlock_db(Dir) :- 1165 retract(rdf_lock(Dir, lock(Out, File))), 1166 !, 1167 unlock_db(Out, File). 1168unlock_db(_). 1169 1170unlock_db(Out, File) :- 1171 close(Out), 1172 delete_file(File). 1173 1174 /******************************* 1175 * FILENAMES * 1176 *******************************/ 1177 1178lockfile(Dir, LockFile) :- 1179 atomic_list_concat([Dir, /, lock], LockFile). 1180 1181directory_levels(Levels) :- 1182 rdf_option(directory_levels(Levels)), 1183 !. 1184directory_levels(2). 1185 1186db_file(Base, File) :- 1187 rdf_directory(Dir), 1188 directory_levels(Levels), 1189 db_file(Dir, Base, Levels, File). 1190 1191db_file(Dir, Base, Levels, File) :- 1192 dir_levels(Base, Levels, Segments, [Base]), 1193 atomic_list_concat([Dir|Segments], /, File). 1194 1195open_db(Base, Mode, Stream, Options) :- 1196 db_file(Base, File), 1197 create_directory_levels(File), 1198 open(File, Mode, Stream, [encoding(utf8)|Options]). 1199 1200create_directory_levels(_File) :- 1201 rdf_option(directory_levels(0)), 1202 !. 1203create_directory_levels(File) :- 1204 file_directory_name(File, Dir), 1205 make_directory_path(Dir). 1206 1207exists_db(Base) :- 1208 db_file(Base, File), 1209 exists_file(File). 1210 1211%! dir_levels(+File, +Levels, ?Segments, ?Tail) is det. 1212% 1213% Create a list of intermediate directory names for File. Each 1214% directory consists of two hexadecimal digits. 1215 1216dir_levels(_, 0, Segments, Segments) :- !. 1217dir_levels(File, Levels, Segments, Tail) :- 1218 rdf_atom_md5(File, 1, Hash), 1219 create_dir_levels(Levels, 0, Hash, Segments, Tail). 1220 1221create_dir_levels(0, _, _, Segments, Segments) :- !. 1222create_dir_levels(N, S, Hash, [S1|Segments0], Tail) :- 1223 sub_atom(Hash, S, 2, _, S1), 1224 S2 is S+2, 1225 N2 is N-1, 1226 create_dir_levels(N2, S2, Hash, Segments0, Tail). 1227 1228 1229%! db_files(+DB, -Snapshot, -Journal). 1230%! db_files(-DB, +Snapshot, -Journal). 1231%! db_files(-DB, -Snapshot, +Journal). 1232% 1233% True if named graph DB is represented by the files Snapshot and 1234% Journal. The filenames are local to the directory representing 1235% the store. 1236 1237db_files(DB, Snapshot, Journal) :- 1238 nonvar(DB), 1239 !, 1240 rdf_db_to_file(DB, Base), 1241 atom_concat(Base, '.trp', Snapshot), 1242 atom_concat(Base, '.jrn', Journal). 1243db_files(DB, Snapshot, Journal) :- 1244 nonvar(Snapshot), 1245 !, 1246 atom_concat(Base, '.trp', Snapshot), 1247 atom_concat(Base, '.jrn', Journal), 1248 rdf_db_to_file(DB, Base). 1249db_files(DB, Snapshot, Journal) :- 1250 nonvar(Journal), 1251 !, 1252 atom_concat(Base, '.jrn', Journal), 1253 atom_concat(Base, '.trp', Snapshot), 1254 rdf_db_to_file(DB, Base). 1255 1256db_abs_files(DB, Snapshot, Journal) :- 1257 db_files(DB, Snapshot0, Journal0), 1258 db_file(Snapshot0, Snapshot), 1259 db_file(Journal0, Journal). 1260 1261 1262%! rdf_journal_file(+Graph, -File) is semidet. 1263%! rdf_journal_file(-Graph, -File) is nondet. 1264% 1265% True if File the name of the existing journal file for Graph. 1266 1267rdf_journal_file(Graph, Journal) :- 1268 ( var(Graph) 1269 -> rdf_graph(Graph) 1270 ; true 1271 ), 1272 db_abs_files(Graph, _Snapshot, Journal), 1273 exists_file(Journal). 1274 1275 1276%! rdf_snapshot_file(+Graph, -File) is semidet. 1277%! rdf_snapshot_file(-Graph, -File) is nondet. 1278% 1279% True if File the name of the existing snapshot file for Graph. 1280 1281rdf_snapshot_file(Graph, Snapshot) :- 1282 ( var(Graph) 1283 -> rdf_graph(Graph) % also pick the empty graphs 1284 ; true 1285 ), 1286 db_abs_files(Graph, Snapshot, _Journal), 1287 exists_file(Snapshot). 1288 1289 1290%! rdf_db_to_file(+DB, -File) is det. 1291%! rdf_db_to_file(-DB, +File) is det. 1292% 1293% Translate between database encoding (often an file or URL) and 1294% the name we store in the directory. We keep a cache for two 1295% reasons. Speed, but much more important is that the mapping of 1296% raw --> encoded provided by www_form_encode/2 is not guaranteed 1297% to be unique by the W3C standards. 1298 1299rdf_db_to_file(DB, File) :- 1300 file_base_db(File, DB), 1301 !. 1302rdf_db_to_file(DB, File) :- 1303 url_to_filename(DB, File), 1304 assert(file_base_db(File, DB)). 1305 1306%! url_to_filename(+URL, -FileName) is det. 1307%! url_to_filename(-URL, +FileName) is det. 1308% 1309% Turn a valid URL into a filename. Earlier versions used 1310% www_form_encode/2, but this can produce characters that are not 1311% valid in filenames. We will use the same encoding as 1312% www_form_encode/2, but using our own rules for allowed 1313% characters. The only requirement is that we avoid any filename 1314% special character in use. The current encoding use US-ASCII 1315% alnum characters, _ and % 1316 1317url_to_filename(URL, FileName) :- 1318 atomic(URL), 1319 !, 1320 atom_codes(URL, Codes), 1321 phrase(url_encode(EncCodes), Codes), 1322 atom_codes(FileName, EncCodes). 1323url_to_filename(URL, FileName) :- 1324 uri_encoded(path, URL, FileName). 1325 1326url_encode([0'+|T]) --> 1327 " ", 1328 !, 1329 url_encode(T). 1330url_encode([C|T]) --> 1331 alphanum(C), 1332 !, 1333 url_encode(T). 1334url_encode([C|T]) --> 1335 no_enc_extra(C), 1336 !, 1337 url_encode(T). 1338url_encode(Enc) --> 1339 ( "\r\n" 1340 ; "\n" 1341 ), 1342 !, 1343 { string_codes("%0D%0A", Codes), 1344 append(Codes, T, Enc) 1345 }, 1346 url_encode(T). 1347url_encode([]) --> 1348 eos, 1349 !. 1350url_encode([0'%,D1,D2|T]) --> 1351 [C], 1352 { Dv1 is (C>>4 /\ 0xf), 1353 Dv2 is (C /\ 0xf), 1354 code_type(D1, xdigit(Dv1)), 1355 code_type(D2, xdigit(Dv2)) 1356 }, 1357 url_encode(T). 1358 1359eos([], []). 1360 1361alphanum(C) --> 1362 [C], 1363 { C < 128, % US-ASCII 1364 code_type(C, alnum) 1365 }. 1366 1367no_enc_extra(0'_) --> "_". 1368 1369 1370 /******************************* 1371 * REINDEX * 1372 *******************************/ 1373 1374%! reindex_db(+Dir, +Levels) 1375% 1376% Reindex the database by creating intermediate directories. 1377 1378reindex_db(Dir, Levels) :- 1379 directory_files(Dir, Files), 1380 reindex_files(Files, Dir, '.', 0, Levels), 1381 remove_empty_directories(Files, Dir). 1382 1383reindex_files([], _, _, _, _). 1384reindex_files([Nofollow|Files], Dir, Prefix, CLevel, Levels) :- 1385 nofollow(Nofollow), 1386 !, 1387 reindex_files(Files, Dir, Prefix, CLevel, Levels). 1388reindex_files([File|Files], Dir, Prefix, CLevel, Levels) :- 1389 CLevel \== Levels, 1390 file_name_extension(_Base, Ext, File), 1391 db_extension(Ext), 1392 !, 1393 directory_file_path(Prefix, File, DBFile), 1394 directory_file_path(Dir, DBFile, OldPath), 1395 db_file(Dir, File, Levels, NewPath), 1396 debug(rdf_persistency, 'Rename ~q --> ~q', [OldPath, NewPath]), 1397 file_directory_name(NewPath, NewDir), 1398 make_directory_path(NewDir), 1399 rename_file(OldPath, NewPath), 1400 reindex_files(Files, Dir, Prefix, CLevel, Levels). 1401reindex_files([D|Files], Dir, Prefix, CLevel, Levels) :- 1402 directory_file_path(Prefix, D, SubD), 1403 directory_file_path(Dir, SubD, AbsD), 1404 exists_directory(AbsD), 1405 \+ read_link(AbsD, _, _), % Do not follow links 1406 !, 1407 directory_files(AbsD, SubFiles), 1408 CLevel2 is CLevel + 1, 1409 reindex_files(SubFiles, Dir, SubD, CLevel2, Levels), 1410 reindex_files(Files, Dir, Prefix, CLevel, Levels). 1411reindex_files([_|Files], Dir, Prefix, CLevel, Levels) :- 1412 reindex_files(Files, Dir, Prefix, CLevel, Levels). 1413 1414 1415remove_empty_directories([], _). 1416remove_empty_directories([File|Files], Dir) :- 1417 \+ nofollow(File), 1418 directory_file_path(Dir, File, Path), 1419 exists_directory(Path), 1420 \+ read_link(Path, _, _), 1421 !, 1422 directory_files(Path, Content), 1423 exclude(nofollow, Content, RealContent), 1424 ( RealContent == [] 1425 -> debug(rdf_persistency, 'Remove empty dir ~q', [Path]), 1426 delete_directory(Path) 1427 ; remove_empty_directories(RealContent, Path) 1428 ), 1429 remove_empty_directories(Files, Dir). 1430remove_empty_directories([_|Files], Dir) :- 1431 remove_empty_directories(Files, Dir). 1432 1433 1434 /******************************* 1435 * PREFIXES * 1436 *******************************/ 1437 1438save_prefixes(Dir) :- 1439 atomic_list_concat([Dir, /, 'prefixes.db'], PrefixFile), 1440 setup_call_cleanup(open(PrefixFile, write, Out, [encoding(utf8)]), 1441 write_prefixes(Out), 1442 close(Out)). 1443 1444write_prefixes(Out) :- 1445 format(Out, '% Snapshot of defined RDF prefixes~n~n', []), 1446 forall(rdf_current_ns(Alias, URI), 1447 format(Out, 'prefix(~q, ~q).~n', [Alias, URI])). 1448 1449%! load_prefixes(+RDFDBDir) is det. 1450% 1451% If the file RDFDBDir/prefixes.db exists, load the prefixes. The 1452% prefixes are registered using rdf_register_ns/3. Possible errors 1453% because the prefix definitions have changed are printed as 1454% warnings, retaining the old definition. Note that changing 1455% prefixes generally requires reloading all RDF from the source. 1456 1457load_prefixes(Dir) :- 1458 atomic_list_concat([Dir, /, 'prefixes.db'], PrefixFile), 1459 ( exists_file(PrefixFile) 1460 -> setup_call_cleanup(open(PrefixFile, read, In, [encoding(utf8)]), 1461 read_prefixes(In), 1462 close(In)) 1463 ; true 1464 ). 1465 1466read_prefixes(Stream) :- 1467 read_term(Stream, T0, []), 1468 read_prefixes(T0, Stream). 1469 1470read_prefixes(end_of_file, _) :- !. 1471read_prefixes(prefix(Alias, URI), Stream) :- 1472 !, 1473 must_be(atom, Alias), 1474 must_be(atom, URI), 1475 catch(rdf_register_ns(Alias, URI, []), E, 1476 print_message(warning, E)), 1477 read_term(Stream, T, []), 1478 read_prefixes(T, Stream). 1479read_prefixes(Term, _) :- 1480 domain_error(prefix_term, Term). 1481 1482 1483 /******************************* 1484 * UTIL * 1485 *******************************/ 1486 1487%! mkdir(+Directory) 1488% 1489% Create a directory if it does not already exist. 1490 1491mkdir(Directory) :- 1492 exists_directory(Directory), 1493 !. 1494mkdir(Directory) :- 1495 make_directory(Directory). 1496 1497%! time_stamp(-Integer) 1498% 1499% Return time-stamp rounded to integer. 1500 1501time_stamp(Int) :- 1502 get_time(Now), 1503 Int is round(Now). 1504 1505 1506 /******************************* 1507 * MESSAGES * 1508 *******************************/ 1509 1510:- multifile 1511 prolog:message/3, 1512 prolog:message_context/3. 1513 1514prolog:message(rdf(Term)) --> 1515 message(Term). 1516 1517message(restoring(Type, Count, Jobs)) --> 1518 [ 'Restoring ~D ~w using ~D concurrent workers'-[Count, Type, Jobs] ]. 1519message(restore(attached(Graphs, Triples, Time/Wall))) --> 1520 { catch(Percent is round(100*Time/Wall), _, Percent = 0) }, 1521 [ 'Loaded ~D graphs (~D triples) in ~2f sec. (~d% CPU = ~2f sec.)'- 1522 [Graphs, Triples, Wall, Percent, Time] ]. 1523% attach_graph/2 1524message(restore(true, Action)) --> 1525 !, 1526 silent_message(Action). 1527message(restore(brief, Action)) --> 1528 !, 1529 brief_message(Action). 1530message(restore(_, Graph)) --> 1531 [ 'Restoring ~p ... '-[Graph], flush ]. 1532message(restore(_, snapshot(_))) --> 1533 [ at_same_line, '(snapshot) '-[], flush ]. 1534message(restore(_, journal(_))) --> 1535 [ at_same_line, '(journal) '-[], flush ]. 1536message(restore(_, done(_, Time, Count))) --> 1537 [ at_same_line, '~D triples in ~2f sec.'-[Count, Time] ]. 1538% load_source/4 1539message(restore(_, snapshot(G, _))) --> 1540 [ 'Restoring ~p\t(snapshot)'-[G], flush ]. 1541message(restore(_, journal(G, _))) --> 1542 [ 'Restoring ~p\t(journal)'-[G], flush ]. 1543message(restore(_, done(_, Time, Count))) --> 1544 [ at_same_line, '~D triples in ~2f sec.'-[Count, Time] ]. 1545% journal handling 1546message(update_failed(S,P,O,Action)) --> 1547 [ 'Failed to update <~p ~p ~p> with ~p'-[S,P,O,Action] ]. 1548% directory reindexing 1549message(reindex(Count, Depth)) --> 1550 [ 'Restructuring database with ~d levels (~D graphs)'-[Depth, Count] ]. 1551message(reindex(Depth)) --> 1552 [ 'Fixing database directory structure (~d levels)'-[Depth] ]. 1553message(read_only) --> 1554 [ 'Cannot write persistent store; continuing in read-only mode.', nl, 1555 'All changes to the RDF store will be lost if this process terminates.' 1556 ]. 1557 1558silent_message(_Action) --> []. 1559 1560brief_message(done(Graph, _Time, _Count, Nth, Total)) --> 1561 { file_base_name(Graph, Base) }, 1562 [ at_same_line, 1563 '\r~p~`.t ~D of ~D graphs~72|'-[Base, Nth, Total], 1564 flush 1565 ]. 1566brief_message(_) --> []. 1567 1568 1569prolog:message_context(rdf_locked(Args)) --> 1570 { memberchk(time(Time), Args), 1571 memberchk(pid(Pid), Args), 1572 format_time(string(S), '%+', Time) 1573 }, 1574 [ nl, 1575 'locked at ~s by process id ~w'-[S,Pid] 1576 ]. 1577