1/*  Part of SWI-Prolog
2
3    Author:        Jan Wielemaker
4    E-mail:        J.Wielemaker@vu.nl
5    WWW:           http://www.swi-prolog.org
6    Copyright (c)  2006-2015, University of Amsterdam
7                              VU University Amsterdam
8    All rights reserved.
9
10    Redistribution and use in source and binary forms, with or without
11    modification, are permitted provided that the following conditions
12    are met:
13
14    1. Redistributions of source code must retain the above copyright
15       notice, this list of conditions and the following disclaimer.
16
17    2. Redistributions in binary form must reproduce the above copyright
18       notice, this list of conditions and the following disclaimer in
19       the documentation and/or other materials provided with the
20       distribution.
21
22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
33    POSSIBILITY OF SUCH DAMAGE.
34*/
35
36:- module(rdf_persistency,
37          [ rdf_attach_db/2,            % +Directory, +Options
38            rdf_detach_db/0,            % +Detach current Graph
39            rdf_current_db/1,           % -Directory
40            rdf_persistency/2,          % +Graph, +Bool
41            rdf_flush_journals/1,       % +Options
42            rdf_persistency_property/1, % ?Property
43            rdf_journal_file/2,         % ?Graph, ?JournalFile
44            rdf_snapshot_file/2,        % ?Graph, ?SnapshotFile
45            rdf_db_to_file/2            % ?Graph, ?FileBase
46          ]).
47:- use_module(library(semweb/rdf_db),
48              [ rdf_graph/1, rdf_unload_graph/1, rdf_statistics/1,
49                rdf_load_db/1, rdf_retractall/4, rdf_create_graph/1,
50                rdf_assert/4, rdf_update/5, rdf_monitor/2, rdf/4,
51                rdf_save_db/2, rdf_atom_md5/3, rdf_current_ns/2,
52                rdf_register_ns/3
53              ]).
54
55:- autoload(library(apply),[maplist/2,maplist/3,partition/4,exclude/3]).
56:- autoload(library(debug),[debug/3]).
57:- autoload(library(error),
58	    [permission_error/3,must_be/2,domain_error/2]).
59:- autoload(library(filesex),
60	    [directory_file_path/3,make_directory_path/1]).
61:- autoload(library(lists),[select/3,append/3]).
62:- autoload(library(option),[option/2,option/3]).
63:- autoload(library(readutil),[read_file_to_terms/3]).
64:- autoload(library(socket),[gethostname/1]).
65:- autoload(library(thread),[concurrent/3]).
66:- autoload(library(uri),[uri_encoded/3]).
67
68/** <module> RDF persistency plugin
69
70This  module  provides  persistency   for    rdf_db.pl   based   on  the
71rdf_monitor/2 predicate to  track  changes   to  the  repository.  Where
72previous  versions  used  autosafe  of  the  whole  database  using  the
73quick-load format of rdf_db, this version is  based on a quick-load file
74per source (4th argument of rdf/4), and journalling for edit operations.
75
76The result is safe, avoids frequent small   changes to large files which
77makes synchronisation and backup expensive and avoids long disruption of
78the server doing the autosafe. Only loading large files disrupts service
79for some time.
80
81The persistent backup of the database is  realised in a directory, using
82a lock file to avoid corruption due to concurrent access. Each source is
83represented by two files, the latest snapshot   and a journal. The state
84is restored by loading  the  snapshot   and  replaying  the journal. The
85predicate rdf_flush_journals/1 can be used to create fresh snapshots and
86delete the journals.
87
88@tbd If there is a complete `.new'   snapshot  and no journal, we should
89     move the .new to the plain snapshot name as a means of recovery.
90
91@tbd Backup of each graph using one or two files is very costly if there
92     are many graphs.  Although the currently used subdirectories avoid
93     hitting OS limits early, this is still not ideal. Probably we
94     should collect (small, older?) files and combine them into a single
95     quick load file.  We could call this (similar to GIT) a `pack'.
96
97@see    rdf_edit.pl
98*/
99
100:- volatile
101    rdf_directory/1,
102    rdf_lock/2,
103    rdf_option/1,
104    source_journal_fd/2,
105    file_base_db/2.
106:- dynamic
107    rdf_directory/1,                % Absolute path
108    rdf_lock/2,                     % Dir, Lock
109    rdf_option/1,                   % Defined options
110    source_journal_fd/2,            % DB, JournalFD
111    file_base_db/2.                 % FileBase, DB
112
113:- meta_predicate
114    no_agc(0).
115
116:- predicate_options(rdf_attach_db/2, 2,
117                     [ access(oneof([read_write,read_only])),
118                       concurrency(positive_integer),
119                       max_open_journals(positive_integer),
120                       silent(oneof([true,false,brief])),
121                       log_nested_transactions(boolean)
122                     ]).
123
124%!  rdf_attach_db(+Directory, +Options) is det.
125%
126%   Start persistent operations using Directory   as  place to store
127%   files.   There are several cases:
128%
129%           * Empty DB, existing directory
130%           Load the DB from the existing directory
131%
132%           * Full DB, empty directory
133%           Create snapshots for all sources in directory
134%
135%   Options:
136%
137%           * access(+AccessMode)
138%           One of =auto= (default), =read_write= or
139%           =read_only=. Read-only access implies that the RDF
140%           store is not locked. It is read at startup and all
141%           modifications to the data are temporary. The default
142%           =auto= mode is =read_write= if the directory is
143%           writeable and the lock can be acquired.  Otherwise
144%           it reverts to =read_only=.
145%
146%           * concurrency(+Jobs)
147%           Number of threads to use for loading the initial
148%           database.  If not provided it is the number of CPUs
149%           as optained from the flag =cpu_count=.
150%
151%           * max_open_journals(+Count)
152%           Maximum number of journals kept open.  If not provided,
153%           the default is 10.  See limit_fd_pool/0.
154%
155%           * directory_levels(+Count)
156%           Number of levels of intermediate directories for storing
157%           the graph files.  Default is 2.
158%
159%           * silent(+BoolOrBrief)
160%           If =true= (default =false=), do not print informational
161%           messages.  Finally, if =brief= it will show minimal
162%           feedback.
163%
164%           * log_nested_transactions(+Boolean)
165%           If =true=, nested _log_ transactions are added to the
166%           journal information.  By default (=false=), no log-term
167%           is added for nested transactions.\\
168%
169%   @error existence_error(source_sink, Directory)
170%   @error permission_error(write, directory, Directory)
171
172rdf_attach_db(DirSpec, Options) :-
173    option(access(read_only), Options),
174    !,
175    absolute_file_name(DirSpec,
176                       Directory,
177                       [ access(read),
178                         file_type(directory)
179                       ]),
180    rdf_attach_db_ro(Directory, Options).
181rdf_attach_db(DirSpec, Options) :-
182    option(access(read_write), Options),
183    !,
184    rdf_attach_db_rw(DirSpec, Options).
185rdf_attach_db(DirSpec, Options) :-
186    absolute_file_name(DirSpec,
187                       Directory,
188                       [ access(exist),
189                         file_type(directory),
190                         file_errors(fail)
191                       ]),
192    !,
193    (   access_file(Directory, write)
194    ->  catch(rdf_attach_db_rw(Directory, Options), E, true),
195        (   var(E)
196        ->  true
197        ;   E = error(permission_error(lock, rdf_db, _), _)
198        ->  print_message(warning, E),
199            print_message(warning, rdf(read_only)),
200            rdf_attach_db(DirSpec, [access(read_only)|Options])
201        ;   throw(E)
202        )
203    ;   print_message(warning,
204                      error(permission_error(write, directory, Directory))),
205        print_message(warning, rdf(read_only)),
206        rdf_attach_db_ro(Directory, Options)
207    ).
208rdf_attach_db(DirSpec, Options) :-
209    catch(rdf_attach_db_rw(DirSpec, Options), E, true),
210    (   var(E)
211    ->  true
212    ;   print_message(warning, E),
213        print_message(warning, rdf(read_only)),
214        rdf_attach_db(DirSpec, [access(read_only)|Options])
215    ).
216
217
218rdf_attach_db_rw(DirSpec, Options) :-
219    absolute_file_name(DirSpec,
220                       Directory,
221                       [ access(write),
222                         file_type(directory),
223                         file_errors(fail)
224                       ]),
225    !,
226    (   rdf_directory(Directory)
227    ->  true                        % update settings?
228    ;   rdf_detach_db,
229        mkdir(Directory),
230        lock_db(Directory),
231        assert(rdf_directory(Directory)),
232        assert_options(Options),
233        stop_monitor,               % make sure not to register load
234        no_agc(load_db),
235        at_halt(rdf_detach_db),
236        start_monitor
237    ).
238rdf_attach_db_rw(DirSpec, Options) :-
239    absolute_file_name(DirSpec,
240                       Directory,
241                       [ solutions(all)
242                       ]),
243    (   exists_directory(Directory)
244    ->  access_file(Directory, write)
245    ;   catch(make_directory(Directory), _, fail)
246    ),
247    !,
248    rdf_attach_db(Directory, Options).
249rdf_attach_db_rw(DirSpec, _) :-         % Generate an existence or
250    absolute_file_name(DirSpec,     % permission error
251                       Directory,
252                       [ access(exist),
253                         file_type(directory)
254                       ]),
255    permission_error(write, directory, Directory).
256
257%!  rdf_attach_db_ro(+Directory, +Options)
258%
259%   Open an RDF database in read-only mode.
260
261rdf_attach_db_ro(Directory, Options) :-
262    rdf_detach_db,
263    assert(rdf_directory(Directory)),
264    assert_options(Options),
265    stop_monitor,           % make sure not to register load
266    no_agc(load_db).
267
268
269assert_options([]).
270assert_options([H|T]) :-
271    (   option_type(H, Check)
272    ->  Check,
273        assert(rdf_option(H))
274    ;   true                        % ignore options we do not understand
275    ),
276    assert_options(T).
277
278option_type(concurrency(X),             must_be(positive_integer, X)).
279option_type(max_open_journals(X),       must_be(positive_integer, X)).
280option_type(directory_levels(X),        must_be(positive_integer, X)).
281option_type(silent(X),                  must_be(oneof([true,false,brief]), X)).
282option_type(log_nested_transactions(X), must_be(boolean, X)).
283option_type(access(X),                  must_be(oneof([read_write,
284                                                       read_only]), X)).
285
286
287%!  rdf_persistency_property(?Property) is nondet.
288%
289%   True when Property is a property of the current persistent database.
290%   Exposes  the  properties  that  can   be    passed   as  options  to
291%   rdf_attach_db/2.                                       Specifically,
292%   rdf_persistency_property(access(read_only)) is true iff the database
293%   is mounted in read-only mode. In addition, the following property is
294%   supported:
295%
296%     - directory(Dir)
297%     The directory in which the database resides.
298
299rdf_persistency_property(Property) :-
300    var(Property),
301    !,
302    rdf_persistency_property_(Property).
303rdf_persistency_property(Property) :-
304    rdf_persistency_property_(Property),
305    !.
306
307rdf_persistency_property_(Property) :-
308    rdf_option(Property).
309rdf_persistency_property_(directory(Dir)) :-
310    rdf_directory(Dir).
311
312%!  no_agc(:Goal)
313%
314%   Run Goal with atom garbage collection   disabled. Loading an RDF
315%   database creates large amounts  of  atoms   we  *know*  are  not
316%   garbage.
317
318no_agc(Goal) :-
319    current_prolog_flag(agc_margin, Old),
320    setup_call_cleanup(
321        set_prolog_flag(agc_margin, 0),
322        Goal,
323        set_prolog_flag(agc_margin, Old)).
324
325
326%!  rdf_detach_db is det.
327%
328%   Detach from the  current  database.   Succeeds  silently  if  no
329%   database is attached. Normally called at  the end of the program
330%   through at_halt/1.
331
332rdf_detach_db :-
333    debug(halt, 'Detaching RDF database', []),
334    stop_monitor,
335    close_journals,
336    (   retract(rdf_directory(Dir))
337    ->  debug(halt, 'DB Directory: ~w', [Dir]),
338        save_prefixes(Dir),
339        retractall(rdf_option(_)),
340        retractall(source_journal_fd(_,_)),
341        unlock_db(Dir)
342    ;   true
343    ).
344
345
346%!  rdf_current_db(?Dir)
347%
348%   True if Dir is the current RDF persistent database.
349
350rdf_current_db(Directory) :-
351    rdf_directory(Dir),
352    !,
353    Dir = Directory.
354
355
356%!  rdf_flush_journals(+Options)
357%
358%   Flush dirty journals.  Options:
359%
360%           * min_size(+KB)
361%           Only flush if journal is over KB in size.
362%           * graph(+Graph)
363%           Only flush the journal of Graph
364%
365%   @tbd Provide a default for min_size?
366
367rdf_flush_journals(Options) :-
368    option(graph(Graph), Options, _),
369    forall(rdf_graph(Graph),
370           rdf_flush_journal(Graph, Options)).
371
372rdf_flush_journal(Graph, Options) :-
373    db_files(Graph, _SnapshotFile, JournalFile),
374    db_file(JournalFile, File),
375    (   \+ exists_file(File)
376    ->  true
377    ;   memberchk(min_size(KB), Options),
378        size_file(JournalFile, Size),
379        Size / 1024 < KB
380    ->  true
381    ;   create_db(Graph)
382    ).
383
384                 /*******************************
385                 *             LOAD             *
386                 *******************************/
387
388%!  load_db is det.
389%
390%   Reload database from the directory specified by rdf_directory/1.
391%   First we find all names graphs using find_dbs/1 and then we load
392%   them.
393
394load_db :-
395    rdf_directory(Dir),
396    concurrency(Jobs),
397    cpu_stat_key(Jobs, StatKey),
398    get_time(Wall0),
399    statistics(StatKey, T0),
400    load_prefixes(Dir),
401    verbosity(Silent),
402    find_dbs(Dir, Graphs, SnapShots, Journals),
403    length(Graphs, GraphCount),
404    maplist(rdf_unload_graph, Graphs),
405    rdf_statistics(triples(Triples0)),
406    load_sources(snapshots, SnapShots, Silent, Jobs),
407    load_sources(journals, Journals, Silent, Jobs),
408    rdf_statistics(triples(Triples1)),
409    statistics(StatKey, T1),
410    get_time(Wall1),
411    T is T1 - T0,
412    Wall is Wall1 - Wall0,
413    Triples = Triples1 - Triples0,
414    message_level(Silent, Level),
415    print_message(Level, rdf(restore(attached(GraphCount, Triples, T/Wall)))).
416
417load_sources(_, [], _, _) :- !.
418load_sources(Type, Sources, Silent, Jobs) :-
419    length(Sources, Count),
420    RunJobs is min(Count, Jobs),
421    print_message(informational, rdf(restoring(Type, Count, RunJobs))),
422    make_goals(Sources, Silent, 1, Count, Goals),
423    concurrent(RunJobs, Goals, []).
424
425
426%!  make_goals(+DBs, +Silent, +Index, +Total, -Goals)
427
428make_goals([], _, _, _, []).
429make_goals([DB|T0], Silent, I,  Total,
430           [load_source(DB, Silent, I, Total)|T]) :-
431    I2 is I + 1,
432    make_goals(T0, Silent, I2, Total, T).
433
434verbosity(Silent) :-
435    rdf_option(silent(Silent)),
436    !.
437verbosity(Silent) :-
438    current_prolog_flag(verbose, silent),
439    !,
440    Silent = true.
441verbosity(brief).
442
443
444%!  concurrency(-Jobs)
445%
446%   Number of jobs to run concurrently.
447
448concurrency(Jobs) :-
449    rdf_option(concurrency(Jobs)),
450    !.
451concurrency(Jobs) :-
452    current_prolog_flag(cpu_count, Jobs),
453    Jobs > 0,
454    !.
455concurrency(1).
456
457cpu_stat_key(1, cputime) :- !.
458cpu_stat_key(_, process_cputime).
459
460
461%!  find_dbs(+Dir, -Graphs, -SnapBySize, -JournalBySize) is det.
462%
463%   Scan the persistent database and return a list of snapshots and
464%   journals, both sorted by file-size.  Each term is of the form
465%
466%     ==
467%     db(Size, Ext, DB, DBFile, Depth)
468%     ==
469
470find_dbs(Dir, Graphs, SnapBySize, JournalBySize) :-
471    directory_files(Dir, Files),
472    phrase(scan_db_files(Files, Dir, '.', 0), Scanned),
473    maplist(db_graph, Scanned, UnsortedGraphs),
474    sort(UnsortedGraphs, Graphs),
475    (   consider_reindex_db(Dir, Graphs, Scanned)
476    ->  find_dbs(Dir, Graphs, SnapBySize, JournalBySize)
477    ;   partition(db_is_snapshot, Scanned, Snapshots, Journals),
478        sort(Snapshots, SnapBySize),
479        sort(Journals, JournalBySize)
480    ).
481
482consider_reindex_db(Dir, Graphs, Scanned) :-
483    length(Graphs, Count),
484    Count > 0,
485    DepthNeeded is floor(log(Count)/log(256)),
486    (   maplist(depth_db(DepthNow), Scanned)
487    ->  (   DepthNeeded > DepthNow
488        ->  true
489        ;   retractall(rdf_option(directory_levels(_))),
490            assertz(rdf_option(directory_levels(DepthNow))),
491            fail
492        )
493    ;   true
494    ),
495    reindex_db(Dir, DepthNeeded).
496
497db_is_snapshot(Term) :-
498    arg(2, Term, trp).
499
500db_graph(Term, DB) :-
501    arg(3, Term, DB).
502
503db_file_name(Term, File) :-
504    arg(4, Term, File).
505
506depth_db(Depth, DB) :-
507    arg(5, DB, Depth).
508
509%!  scan_db_files(+Files, +Dir, +Prefix, +Depth)// is det.
510%
511%   Produces a list of db(DB,  Size,   File)  for all recognised RDF
512%   database files.  File is relative to the database directory Dir.
513
514scan_db_files([], _, _, _) -->
515    [].
516scan_db_files([Nofollow|T], Dir, Prefix, Depth) -->
517    { nofollow(Nofollow) },
518    !,
519    scan_db_files(T, Dir, Prefix, Depth).
520scan_db_files([File|T], Dir, Prefix, Depth) -->
521    { file_name_extension(Base, Ext, File),
522      db_extension(Ext),
523      !,
524      rdf_db_to_file(DB, Base),
525      directory_file_path(Prefix, File, DBFile),
526      directory_file_path(Dir, DBFile, AbsFile),
527      size_file(AbsFile, Size)
528    },
529    [ db(Size, Ext, DB, AbsFile, Depth) ],
530    scan_db_files(T, Dir, Prefix, Depth).
531scan_db_files([D|T], Dir, Prefix, Depth) -->
532    { directory_file_path(Prefix, D, SubD),
533      directory_file_path(Dir, SubD, AbsD),
534      exists_directory(AbsD),
535      \+ read_link(AbsD, _, _),    % Do not follow links
536      !,
537      directory_files(AbsD, SubFiles),
538      SubDepth is Depth + 1
539    },
540    scan_db_files(SubFiles, Dir, SubD, SubDepth),
541    scan_db_files(T, Dir, Prefix, Depth).
542scan_db_files([_|T], Dir, Prefix, Depth) -->
543    scan_db_files(T, Dir, Prefix, Depth).
544
545nofollow(.).
546nofollow(..).
547
548db_extension(trp).
549db_extension(jrn).
550
551:- public load_source/4.                % called through make_goals/5
552
553load_source(DB, Silent, Nth, Total) :-
554    db_file_name(DB, File),
555    db_graph(DB, Graph),
556    message_level(Silent, Level),
557    graph_triple_count(Graph, Count0),
558    statistics(cputime, T0),
559    (   db_is_snapshot(DB)
560    ->  print_message(Level, rdf(restore(Silent, snapshot(Graph, File)))),
561        rdf_load_db(File)
562    ;   print_message(Level, rdf(restore(Silent, journal(Graph, File)))),
563        load_journal(File, Graph)
564    ),
565    statistics(cputime, T1),
566    T is T1 - T0,
567    graph_triple_count(Graph, Count1),
568    Count is Count1 - Count0,
569    print_message(Level, rdf(restore(Silent,
570                                     done(Graph, T, Count, Nth, Total)))).
571
572
573graph_triple_count(Graph, Count) :-
574    rdf_statistics(triples_by_graph(Graph, Count)),
575    !.
576graph_triple_count(_, 0).
577
578
579%!  attach_graph(+Graph, +Options) is det.
580%
581%   Load triples and reload  journal   from  the  indicated snapshot
582%   file.
583
584attach_graph(Graph, Options) :-
585    (   option(silent(true), Options)
586    ->  Level = silent
587    ;   Level = informational
588    ),
589    db_files(Graph, SnapshotFile, JournalFile),
590    rdf_retractall(_,_,_,Graph),
591    statistics(cputime, T0),
592    print_message(Level, rdf(restore(Silent, Graph))),
593    db_file(SnapshotFile, AbsSnapShot),
594    (   exists_file(AbsSnapShot)
595    ->  print_message(Level, rdf(restore(Silent, snapshot(SnapshotFile)))),
596        rdf_load_db(AbsSnapShot)
597    ;   true
598    ),
599    (   exists_db(JournalFile)
600    ->  print_message(Level, rdf(restore(Silent, journal(JournalFile)))),
601        load_journal(JournalFile, Graph)
602    ;   true
603    ),
604    statistics(cputime, T1),
605    T is T1 - T0,
606    (   rdf_statistics(triples_by_graph(Graph, Count))
607    ->  true
608    ;   Count = 0
609    ),
610    print_message(Level, rdf(restore(Silent,
611                                     done(Graph, T, Count)))).
612
613message_level(true, silent) :- !.
614message_level(_, informational).
615
616
617                 /*******************************
618                 *         LOAD JOURNAL         *
619                 *******************************/
620
621%!  load_journal(+File:atom, +DB:atom) is det.
622%
623%   Process transactions from the RDF journal File, adding the given
624%   named graph.
625
626load_journal(File, DB) :-
627    rdf_create_graph(DB),
628    setup_call_cleanup(
629        open(File, read, In, [encoding(utf8)]),
630        ( read(In, T0),
631          process_journal(T0, In, DB)
632        ),
633        close(In)).
634
635process_journal(end_of_file, _, _) :- !.
636process_journal(Term, In, DB) :-
637    (   process_journal_term(Term, DB)
638    ->  true
639    ;   throw(error(type_error(journal_term, Term), _))
640    ),
641    read(In, T2),
642    process_journal(T2, In, DB).
643
644process_journal_term(assert(S,P,O), DB) :-
645    rdf_assert(S,P,O,DB).
646process_journal_term(assert(S,P,O,Line), DB) :-
647    rdf_assert(S,P,O,DB:Line).
648process_journal_term(retract(S,P,O), DB) :-
649    rdf_retractall(S,P,O,DB).
650process_journal_term(retract(S,P,O,Line), DB) :-
651    rdf_retractall(S,P,O,DB:Line).
652process_journal_term(update(S,P,O,Action), DB) :-
653    (   rdf_update(S,P,O,DB, Action)
654    ->  true
655    ;   print_message(warning, rdf(update_failed(S,P,O,Action)))
656    ).
657process_journal_term(start(_), _).      % journal open/close
658process_journal_term(end(_), _).
659process_journal_term(begin(_), _).      % logged transaction (compatibility)
660process_journal_term(end, _).
661process_journal_term(begin(_,_,_,_), _). % logged transaction (current)
662process_journal_term(end(_,_,_), _).
663
664
665                 /*******************************
666                 *         CREATE JOURNAL       *
667                 *******************************/
668
669:- dynamic
670    blocked_db/2,                   % DB, Reason
671    transaction_message/3,          % Nesting, Time, Message
672    transaction_db/3.               % Nesting, DB, Id
673
674%!  rdf_persistency(+DB, Bool)
675%
676%   Specify whether a database is persistent.  Switching to =false=
677%   kills the persistent state.  Switching to =true= creates it.
678
679rdf_persistency(DB, Bool) :-
680    must_be(atom, DB),
681    must_be(boolean, Bool),
682    fail.
683rdf_persistency(DB, false) :-
684    !,
685    (   blocked_db(DB, persistency)
686    ->  true
687    ;   assert(blocked_db(DB, persistency)),
688        delete_db(DB)
689    ).
690rdf_persistency(DB, true) :-
691    (   retract(blocked_db(DB, persistency))
692    ->  create_db(DB)
693    ;   true
694    ).
695
696%!  rdf_db:property_of_graph(?Property, +Graph) is nondet.
697%
698%   Extend rdf_graph_property/2 with new properties.
699
700:- multifile
701    rdf_db:property_of_graph/2.
702
703rdf_db:property_of_graph(persistent(State), Graph) :-
704    (   blocked_db(Graph, persistency)
705    ->  State = false
706    ;   State = true
707    ).
708
709
710%!  start_monitor is det.
711%!  stop_monitor is det.
712%
713%   Start/stop monitoring the RDF database   for  changes and update
714%   the journal.
715
716start_monitor :-
717    rdf_monitor(monitor,
718                [ -assert(load)
719                ]).
720stop_monitor :-
721    rdf_monitor(monitor,
722                [ -all
723                ]).
724
725%!  monitor(+Term) is semidet.
726%
727%   Handle an rdf_monitor/2 callback to  deal with persistency. Note
728%   that the monitor calls that come   from rdf_db.pl that deal with
729%   database changes are serialized.  They   do  come from different
730%   threads though.
731
732monitor(Msg) :-
733    debug(monitor, 'Monitor: ~p~n', [Msg]),
734    fail.
735monitor(assert(S,P,O,DB:Line)) :-
736    !,
737    \+ blocked_db(DB, _),
738    journal_fd(DB, Fd),
739    open_transaction(DB, Fd),
740    format(Fd, '~q.~n', [assert(S,P,O,Line)]),
741    sync_journal(DB, Fd).
742monitor(assert(S,P,O,DB)) :-
743    \+ blocked_db(DB, _),
744    journal_fd(DB, Fd),
745    open_transaction(DB, Fd),
746    format(Fd, '~q.~n', [assert(S,P,O)]),
747    sync_journal(DB, Fd).
748monitor(retract(S,P,O,DB:Line)) :-
749    !,
750    \+ blocked_db(DB, _),
751    journal_fd(DB, Fd),
752    open_transaction(DB, Fd),
753    format(Fd, '~q.~n', [retract(S,P,O,Line)]),
754    sync_journal(DB, Fd).
755monitor(retract(S,P,O,DB)) :-
756    \+ blocked_db(DB, _),
757    journal_fd(DB, Fd),
758    open_transaction(DB, Fd),
759    format(Fd, '~q.~n', [retract(S,P,O)]),
760    sync_journal(DB, Fd).
761monitor(update(S,P,O,DB:Line,Action)) :-
762    !,
763    \+ blocked_db(DB, _),
764    (   Action = graph(NewDB)
765    ->  monitor(assert(S,P,O,NewDB)),
766        monitor(retract(S,P,O,DB:Line))
767    ;   journal_fd(DB, Fd),
768        format(Fd, '~q.~n', [update(S,P,O,Action)]),
769        sync_journal(DB, Fd)
770    ).
771monitor(update(S,P,O,DB,Action)) :-
772    \+ blocked_db(DB, _),
773    (   Action = graph(NewDB)
774    ->  monitor(assert(S,P,O,NewDB)),
775        monitor(retract(S,P,O,DB))
776    ;   journal_fd(DB, Fd),
777        open_transaction(DB, Fd),
778        format(Fd, '~q.~n', [update(S,P,O,Action)]),
779        sync_journal(DB, Fd)
780    ).
781monitor(load(BE, _DumpFileURI)) :-
782    (   BE = end(Graphs)
783    ->  sync_loaded_graphs(Graphs)
784    ;   true
785    ).
786monitor(create_graph(Graph)) :-
787    \+ blocked_db(Graph, _),
788    journal_fd(Graph, Fd),
789    open_transaction(Graph, Fd),
790    sync_journal(Graph, Fd).
791monitor(reset) :-
792    forall(rdf_graph(Graph), delete_db(Graph)).
793                                        % TBD: Remove empty directories?
794
795monitor(transaction(BE, Id)) :-
796    monitor_transaction(Id, BE).
797
798monitor_transaction(load_journal(DB), begin(_)) :-
799    !,
800    assert(blocked_db(DB, journal)).
801monitor_transaction(load_journal(DB), end(_)) :-
802    !,
803    retractall(blocked_db(DB, journal)).
804
805monitor_transaction(parse(URI), begin(_)) :-
806    !,
807    (   blocked_db(URI, persistency)
808    ->  true
809    ;   assert(blocked_db(URI, parse))
810    ).
811monitor_transaction(parse(URI), end(_)) :-
812    !,
813    (   retract(blocked_db(URI, parse))
814    ->  create_db(URI)
815    ;   true
816    ).
817monitor_transaction(unload(DB), begin(_)) :-
818    !,
819    (   blocked_db(DB, persistency)
820    ->  true
821    ;   assert(blocked_db(DB, unload))
822    ).
823monitor_transaction(unload(DB), end(_)) :-
824    !,
825    (   retract(blocked_db(DB, unload))
826    ->  delete_db(DB)
827    ;   true
828    ).
829monitor_transaction(log(Msg), begin(N)) :-
830    !,
831    check_nested(N),
832    get_time(Time),
833    asserta(transaction_message(N, Time, Msg)).
834monitor_transaction(log(_), end(N)) :-
835    check_nested(N),
836    retract(transaction_message(N, _, _)),
837    !,
838    findall(DB:Id, retract(transaction_db(N, DB, Id)), DBs),
839    end_transactions(DBs, N).
840monitor_transaction(log(Msg, DB), begin(N)) :-
841    !,
842    check_nested(N),
843    get_time(Time),
844    asserta(transaction_message(N, Time, Msg)),
845    journal_fd(DB, Fd),
846    open_transaction(DB, Fd).
847monitor_transaction(log(Msg, _DB), end(N)) :-
848    monitor_transaction(log(Msg), end(N)).
849
850
851%!  check_nested(+Level) is semidet.
852%
853%   True if we must log this transaction.   This  is always the case
854%   for toplevel transactions. Nested transactions   are only logged
855%   if log_nested_transactions(true) is defined.
856
857check_nested(0) :- !.
858check_nested(_) :-
859    rdf_option(log_nested_transactions(true)).
860
861
862%!  open_transaction(+DB, +Fd) is det.
863%
864%   Add a begin(Id, Level, Time,  Message)   term  if  a transaction
865%   involves DB. Id is an incremental   integer, where each database
866%   has its own counter. Level is the nesting level, Time a floating
867%   point timestamp and Message te message   provided as argument to
868%   the log message.
869
870open_transaction(DB, Fd) :-
871    transaction_message(N, Time, Msg),
872    !,
873    (   transaction_db(N, DB, _)
874    ->  true
875    ;   next_transaction_id(DB, Id),
876        assert(transaction_db(N, DB, Id)),
877        RoundedTime is round(Time*100)/100,
878        format(Fd, '~q.~n', [begin(Id, N, RoundedTime, Msg)])
879    ).
880open_transaction(_,_).
881
882
883%!  next_transaction_id(+DB, -Id) is det.
884%
885%   Id is the number to user for  the next logged transaction on DB.
886%   Transactions in each  named  graph   are  numbered  in sequence.
887%   Searching the Id of the last transaction is performed by the 2nd
888%   clause starting 1Kb from the end   and doubling this offset each
889%   failure.
890
891:- dynamic
892    current_transaction_id/2.
893
894next_transaction_id(DB, Id) :-
895    retract(current_transaction_id(DB, Last)),
896    !,
897    Id is Last + 1,
898    assert(current_transaction_id(DB, Id)).
899next_transaction_id(DB, Id) :-
900    db_files(DB, _, Journal),
901    exists_file(Journal),
902    !,
903    size_file(Journal, Size),
904    open_db(Journal, read, In, []),
905    call_cleanup(iterative_expand(In, Size, Last), close(In)),
906    Id is Last + 1,
907    assert(current_transaction_id(DB, Id)).
908next_transaction_id(DB, 1) :-
909    assert(current_transaction_id(DB, 1)).
910
911iterative_expand(_, 0, 0) :- !.
912iterative_expand(In, Size, Last) :-     % Scan growing sections from the end
913    Max is floor(log(Size)/log(2)),
914    between(10, Max, Step),
915    Offset is -(1<<Step),
916    seek(In, Offset, eof, _),
917    skip(In, 10),                   % records are line-based
918    read(In, T0),
919    last_transaction_id(T0, In, 0, Last),
920    Last > 0,
921    !.
922iterative_expand(In, _, Last) :-        % Scan the whole file
923    seek(In, 0, bof, _),
924    read(In, T0),
925    last_transaction_id(T0, In, 0, Last).
926
927last_transaction_id(end_of_file, _, Last, Last) :- !.
928last_transaction_id(end(Id, _, _), In, _, Last) :-
929    read(In, T1),
930    last_transaction_id(T1, In, Id, Last).
931last_transaction_id(_, In, Id, Last) :-
932    read(In, T1),
933    last_transaction_id(T1, In, Id, Last).
934
935
936%!  end_transactions(+DBs:list(atom:id)) is det.
937%
938%   End a transaction that affected the  given list of databases. We
939%   write the list of other affected databases as an argument to the
940%   end-term to facilitate fast finding of the related transactions.
941%
942%   In each database, the transaction is   ended with a term end(Id,
943%   Nesting, Others), where  Id  and   Nesting  are  the transaction
944%   identifier and nesting (see open_transaction/2)  and Others is a
945%   list of DB:Id,  indicating  other   databases  affected  by  the
946%   transaction.
947
948end_transactions(DBs, N) :-
949    end_transactions(DBs, DBs, N).
950
951end_transactions([], _, _).
952end_transactions([DB:Id|T], DBs, N) :-
953    journal_fd(DB, Fd),
954    once(select(DB:Id, DBs, Others)),
955    format(Fd, 'end(~q, ~q, ~q).~n', [Id, N, Others]),
956    sync_journal(DB, Fd),
957    end_transactions(T, DBs, N).
958
959
960%!  sync_loaded_graphs(+Graphs)
961%
962%   Called after a binary triple has been loaded that added triples
963%   to the given graphs.
964
965sync_loaded_graphs(Graphs) :-
966    maplist(create_db, Graphs).
967
968
969                 /*******************************
970                 *         JOURNAL FILES        *
971                 *******************************/
972
973%!  journal_fd(+DB, -Stream) is det.
974%
975%   Get an open stream to a journal. If the journal is not open, old
976%   journals are closed to satisfy   the =max_open_journals= option.
977%   Then the journal is opened in   =append= mode. Journal files are
978%   always encoded as UTF-8 for  portability   as  well as to ensure
979%   full coverage of Unicode.
980
981journal_fd(DB, Fd) :-
982    source_journal_fd(DB, Fd),
983    !.
984journal_fd(DB, Fd) :-
985    with_mutex(rdf_journal_file,
986               journal_fd_(DB, Out)),
987    Fd = Out.
988
989journal_fd_(DB, Fd) :-
990    source_journal_fd(DB, Fd),
991    !.
992journal_fd_(DB, Fd) :-
993    limit_fd_pool,
994    db_files(DB, _Snapshot, Journal),
995    open_db(Journal, append, Fd,
996            [ close_on_abort(false)
997            ]),
998    time_stamp(Now),
999    format(Fd, '~q.~n', [start([time(Now)])]),
1000    assert(source_journal_fd(DB, Fd)).              % new one at the end
1001
1002%!  limit_fd_pool is det.
1003%
1004%   Limit the number of  open   journals  to max_open_journals (10).
1005%   Note that calls  from  rdf_monitor/2   are  issued  in different
1006%   threads, but as they are part of write operations they are fully
1007%   synchronised.
1008
1009limit_fd_pool :-
1010    predicate_property(source_journal_fd(_, _), number_of_clauses(N)),
1011    !,
1012    (   rdf_option(max_open_journals(Max))
1013    ->  true
1014    ;   Max = 10
1015    ),
1016    Close is N - Max,
1017    forall(between(1, Close, _),
1018           close_oldest_journal).
1019limit_fd_pool.
1020
1021close_oldest_journal :-
1022    source_journal_fd(DB, _Fd),
1023    !,
1024    debug(rdf_persistency, 'Closing old journal for ~q', [DB]),
1025    close_journal(DB).
1026close_oldest_journal.
1027
1028
1029%!  sync_journal(+DB, +Fd)
1030%
1031%   Sync journal represented by database and   stream.  If the DB is
1032%   involved in a transaction there is   no point flushing until the
1033%   end of the transaction.
1034
1035sync_journal(DB, _) :-
1036    transaction_db(_, DB, _),
1037    !.
1038sync_journal(_, Fd) :-
1039    flush_output(Fd).
1040
1041%!  close_journal(+DB) is det.
1042%
1043%   Close the journal associated with DB if it is open.
1044
1045close_journal(DB) :-
1046    with_mutex(rdf_journal_file,
1047               close_journal_(DB)).
1048
1049close_journal_(DB) :-
1050    (   retract(source_journal_fd(DB, Fd))
1051    ->  time_stamp(Now),
1052        format(Fd, '~q.~n', [end([time(Now)])]),
1053        close(Fd, [force(true)])
1054    ;   true
1055    ).
1056
1057%!  close_journals
1058%
1059%   Close all open journals.
1060
1061close_journals :-
1062    forall(source_journal_fd(DB, _),
1063           catch(close_journal(DB), E,
1064                 print_message(error, E))).
1065
1066%!  create_db(+Graph)
1067%
1068%   Create a saved version of Graph in corresponding file, close and
1069%   delete journals.
1070
1071create_db(Graph) :-
1072    \+ rdf(_,_,_,Graph),
1073    !,
1074    debug(rdf_persistency, 'Deleting empty Graph ~w', [Graph]),
1075    delete_db(Graph).
1076create_db(Graph) :-
1077    debug(rdf_persistency, 'Saving Graph ~w', [Graph]),
1078    close_journal(Graph),
1079    db_abs_files(Graph, Snapshot, Journal),
1080    atom_concat(Snapshot, '.new', NewSnapshot),
1081    (   catch(( create_directory_levels(Snapshot),
1082                rdf_save_db(NewSnapshot, Graph)
1083              ), Error,
1084              ( print_message(warning, Error),
1085                fail
1086              ))
1087    ->  (   exists_file(Journal)
1088        ->  delete_file(Journal)
1089        ;   true
1090        ),
1091        rename_file(NewSnapshot, Snapshot),
1092        debug(rdf_persistency, 'Saved Graph ~w', [Graph])
1093    ;   catch(delete_file(NewSnapshot), _, true)
1094    ).
1095
1096
1097%!  delete_db(+DB)
1098%
1099%   Remove snapshot and journal file for DB.
1100
1101delete_db(DB) :-
1102    with_mutex(rdf_journal_file,
1103               delete_db_(DB)).
1104
1105delete_db_(DB) :-
1106    close_journal_(DB),
1107    db_abs_files(DB, Snapshot, Journal),
1108    !,
1109    (   exists_file(Journal)
1110    ->  delete_file(Journal)
1111    ;   true
1112    ),
1113    (   exists_file(Snapshot)
1114    ->  delete_file(Snapshot)
1115    ;   true
1116    ).
1117delete_db_(_).
1118
1119                 /*******************************
1120                 *             LOCKING          *
1121                 *******************************/
1122
1123%!  lock_db(+Dir)
1124%
1125%   Lock the database directory Dir.
1126
1127lock_db(Dir) :-
1128    lockfile(Dir, File),
1129    catch(open(File, update, Out, [lock(write), wait(false)]),
1130          error(permission_error(Access, _, _), _),
1131          locked_error(Access, Dir)),
1132    (   current_prolog_flag(pid, PID)
1133    ->  true
1134    ;   PID = 0                     % TBD: Fix in Prolog
1135    ),
1136    time_stamp(Now),
1137    gethostname(Host),
1138    format(Out, '/* RDF Database is in use */~n~n', []),
1139    format(Out, '~q.~n', [ locked([ time(Now),
1140                                    pid(PID),
1141                                    host(Host)
1142                                  ])
1143                         ]),
1144    flush_output(Out),
1145    set_end_of_stream(Out),
1146    assert(rdf_lock(Dir, lock(Out, File))),
1147    at_halt(unlock_db(Dir)).
1148
1149locked_error(lock, Dir) :-
1150    lockfile(Dir, File),
1151    (   catch(read_file_to_terms(File, Terms, []), _, fail),
1152        Terms = [locked(Args)]
1153    ->  Context = rdf_locked(Args)
1154    ;   Context = context(_, 'Database is in use')
1155    ),
1156    throw(error(permission_error(lock, rdf_db, Dir), Context)).
1157locked_error(open, Dir) :-
1158    throw(error(permission_error(lock, rdf_db, Dir),
1159                context(_, 'Lock file cannot be opened'))).
1160
1161%!  unlock_db(+Dir) is det.
1162%!  unlock_db(+Stream, +File) is det.
1163
1164unlock_db(Dir) :-
1165    retract(rdf_lock(Dir, lock(Out, File))),
1166    !,
1167    unlock_db(Out, File).
1168unlock_db(_).
1169
1170unlock_db(Out, File) :-
1171    close(Out),
1172    delete_file(File).
1173
1174                 /*******************************
1175                 *           FILENAMES          *
1176                 *******************************/
1177
1178lockfile(Dir, LockFile) :-
1179    atomic_list_concat([Dir, /, lock], LockFile).
1180
1181directory_levels(Levels) :-
1182    rdf_option(directory_levels(Levels)),
1183    !.
1184directory_levels(2).
1185
1186db_file(Base, File) :-
1187    rdf_directory(Dir),
1188    directory_levels(Levels),
1189    db_file(Dir, Base, Levels, File).
1190
1191db_file(Dir, Base, Levels, File) :-
1192    dir_levels(Base, Levels, Segments, [Base]),
1193    atomic_list_concat([Dir|Segments], /, File).
1194
1195open_db(Base, Mode, Stream, Options) :-
1196    db_file(Base, File),
1197    create_directory_levels(File),
1198    open(File, Mode, Stream, [encoding(utf8)|Options]).
1199
1200create_directory_levels(_File) :-
1201    rdf_option(directory_levels(0)),
1202    !.
1203create_directory_levels(File) :-
1204    file_directory_name(File, Dir),
1205    make_directory_path(Dir).
1206
1207exists_db(Base) :-
1208    db_file(Base, File),
1209    exists_file(File).
1210
1211%!  dir_levels(+File, +Levels, ?Segments, ?Tail) is det.
1212%
1213%   Create a list of intermediate directory names for File.  Each
1214%   directory consists of two hexadecimal digits.
1215
1216dir_levels(_, 0, Segments, Segments) :- !.
1217dir_levels(File, Levels, Segments, Tail) :-
1218    rdf_atom_md5(File, 1, Hash),
1219    create_dir_levels(Levels, 0, Hash, Segments, Tail).
1220
1221create_dir_levels(0, _, _, Segments, Segments) :- !.
1222create_dir_levels(N, S, Hash, [S1|Segments0], Tail) :-
1223    sub_atom(Hash, S, 2, _, S1),
1224    S2 is S+2,
1225    N2 is N-1,
1226    create_dir_levels(N2, S2, Hash, Segments0, Tail).
1227
1228
1229%!  db_files(+DB, -Snapshot, -Journal).
1230%!  db_files(-DB, +Snapshot, -Journal).
1231%!  db_files(-DB, -Snapshot, +Journal).
1232%
1233%   True if named graph DB is represented  by the files Snapshot and
1234%   Journal. The filenames are local   to the directory representing
1235%   the store.
1236
1237db_files(DB, Snapshot, Journal) :-
1238    nonvar(DB),
1239    !,
1240    rdf_db_to_file(DB, Base),
1241    atom_concat(Base, '.trp', Snapshot),
1242    atom_concat(Base, '.jrn', Journal).
1243db_files(DB, Snapshot, Journal) :-
1244    nonvar(Snapshot),
1245    !,
1246    atom_concat(Base, '.trp', Snapshot),
1247    atom_concat(Base, '.jrn', Journal),
1248    rdf_db_to_file(DB, Base).
1249db_files(DB, Snapshot, Journal) :-
1250    nonvar(Journal),
1251    !,
1252    atom_concat(Base, '.jrn', Journal),
1253    atom_concat(Base, '.trp', Snapshot),
1254    rdf_db_to_file(DB, Base).
1255
1256db_abs_files(DB, Snapshot, Journal) :-
1257    db_files(DB, Snapshot0, Journal0),
1258    db_file(Snapshot0, Snapshot),
1259    db_file(Journal0, Journal).
1260
1261
1262%!  rdf_journal_file(+Graph, -File) is semidet.
1263%!  rdf_journal_file(-Graph, -File) is nondet.
1264%
1265%   True if File the name of the existing journal file for Graph.
1266
1267rdf_journal_file(Graph, Journal) :-
1268    (   var(Graph)
1269    ->  rdf_graph(Graph)
1270    ;   true
1271    ),
1272    db_abs_files(Graph, _Snapshot, Journal),
1273    exists_file(Journal).
1274
1275
1276%!  rdf_snapshot_file(+Graph, -File) is semidet.
1277%!  rdf_snapshot_file(-Graph, -File) is nondet.
1278%
1279%   True if File the name of the existing snapshot file for Graph.
1280
1281rdf_snapshot_file(Graph, Snapshot) :-
1282    (   var(Graph)
1283    ->  rdf_graph(Graph)    % also pick the empty graphs
1284    ;   true
1285    ),
1286    db_abs_files(Graph, Snapshot, _Journal),
1287    exists_file(Snapshot).
1288
1289
1290%!  rdf_db_to_file(+DB, -File) is det.
1291%!  rdf_db_to_file(-DB, +File) is det.
1292%
1293%   Translate between database encoding (often an   file or URL) and
1294%   the name we store in the  directory.   We  keep  a cache for two
1295%   reasons. Speed, but much more important   is that the mapping of
1296%   raw --> encoded provided by  www_form_encode/2 is not guaranteed
1297%   to be unique by the W3C standards.
1298
1299rdf_db_to_file(DB, File) :-
1300    file_base_db(File, DB),
1301    !.
1302rdf_db_to_file(DB, File) :-
1303    url_to_filename(DB, File),
1304    assert(file_base_db(File, DB)).
1305
1306%!  url_to_filename(+URL, -FileName) is det.
1307%!  url_to_filename(-URL, +FileName) is det.
1308%
1309%   Turn  a  valid  URL  into  a  filename.  Earlier  versions  used
1310%   www_form_encode/2, but this can produce  characters that are not
1311%   valid  in  filenames.  We  will  use    the   same  encoding  as
1312%   www_form_encode/2,  but  using  our  own    rules   for  allowed
1313%   characters. The only requirement is that   we avoid any filename
1314%   special character in use.  The   current  encoding  use US-ASCII
1315%   alnum characters, _ and %
1316
1317url_to_filename(URL, FileName) :-
1318    atomic(URL),
1319    !,
1320    atom_codes(URL, Codes),
1321    phrase(url_encode(EncCodes), Codes),
1322    atom_codes(FileName, EncCodes).
1323url_to_filename(URL, FileName) :-
1324    uri_encoded(path, URL, FileName).
1325
1326url_encode([0'+|T]) -->
1327    " ",
1328    !,
1329    url_encode(T).
1330url_encode([C|T]) -->
1331    alphanum(C),
1332    !,
1333    url_encode(T).
1334url_encode([C|T]) -->
1335    no_enc_extra(C),
1336    !,
1337    url_encode(T).
1338url_encode(Enc) -->
1339    (   "\r\n"
1340    ;   "\n"
1341    ),
1342    !,
1343    { string_codes("%0D%0A", Codes),
1344      append(Codes, T, Enc)
1345    },
1346    url_encode(T).
1347url_encode([]) -->
1348    eos,
1349    !.
1350url_encode([0'%,D1,D2|T]) -->
1351    [C],
1352    { Dv1 is (C>>4 /\ 0xf),
1353      Dv2 is (C /\ 0xf),
1354      code_type(D1, xdigit(Dv1)),
1355      code_type(D2, xdigit(Dv2))
1356    },
1357    url_encode(T).
1358
1359eos([], []).
1360
1361alphanum(C) -->
1362    [C],
1363    { C < 128,                      % US-ASCII
1364      code_type(C, alnum)
1365    }.
1366
1367no_enc_extra(0'_) --> "_".
1368
1369
1370                 /*******************************
1371                 *             REINDEX          *
1372                 *******************************/
1373
1374%!  reindex_db(+Dir, +Levels)
1375%
1376%   Reindex the database by creating intermediate directories.
1377
1378reindex_db(Dir, Levels) :-
1379    directory_files(Dir, Files),
1380    reindex_files(Files, Dir, '.', 0, Levels),
1381    remove_empty_directories(Files, Dir).
1382
1383reindex_files([], _, _, _, _).
1384reindex_files([Nofollow|Files], Dir, Prefix, CLevel, Levels) :-
1385    nofollow(Nofollow),
1386    !,
1387    reindex_files(Files, Dir, Prefix, CLevel, Levels).
1388reindex_files([File|Files], Dir, Prefix, CLevel, Levels) :-
1389    CLevel \== Levels,
1390    file_name_extension(_Base, Ext, File),
1391    db_extension(Ext),
1392    !,
1393    directory_file_path(Prefix, File, DBFile),
1394    directory_file_path(Dir, DBFile, OldPath),
1395    db_file(Dir, File, Levels, NewPath),
1396    debug(rdf_persistency, 'Rename ~q --> ~q', [OldPath, NewPath]),
1397    file_directory_name(NewPath, NewDir),
1398    make_directory_path(NewDir),
1399    rename_file(OldPath, NewPath),
1400    reindex_files(Files, Dir, Prefix, CLevel, Levels).
1401reindex_files([D|Files], Dir, Prefix, CLevel, Levels) :-
1402    directory_file_path(Prefix, D, SubD),
1403    directory_file_path(Dir, SubD, AbsD),
1404    exists_directory(AbsD),
1405    \+ read_link(AbsD, _, _),      % Do not follow links
1406    !,
1407    directory_files(AbsD, SubFiles),
1408    CLevel2 is CLevel + 1,
1409    reindex_files(SubFiles, Dir, SubD, CLevel2, Levels),
1410    reindex_files(Files, Dir, Prefix, CLevel, Levels).
1411reindex_files([_|Files], Dir, Prefix, CLevel, Levels) :-
1412    reindex_files(Files, Dir, Prefix, CLevel, Levels).
1413
1414
1415remove_empty_directories([], _).
1416remove_empty_directories([File|Files], Dir) :-
1417    \+ nofollow(File),
1418    directory_file_path(Dir, File, Path),
1419    exists_directory(Path),
1420    \+ read_link(Path, _, _),
1421    !,
1422    directory_files(Path, Content),
1423    exclude(nofollow, Content, RealContent),
1424    (   RealContent == []
1425    ->  debug(rdf_persistency, 'Remove empty dir ~q', [Path]),
1426        delete_directory(Path)
1427    ;   remove_empty_directories(RealContent, Path)
1428    ),
1429    remove_empty_directories(Files, Dir).
1430remove_empty_directories([_|Files], Dir) :-
1431    remove_empty_directories(Files, Dir).
1432
1433
1434                 /*******************************
1435                 *            PREFIXES          *
1436                 *******************************/
1437
1438save_prefixes(Dir) :-
1439    atomic_list_concat([Dir, /, 'prefixes.db'], PrefixFile),
1440    setup_call_cleanup(open(PrefixFile, write, Out, [encoding(utf8)]),
1441                       write_prefixes(Out),
1442                       close(Out)).
1443
1444write_prefixes(Out) :-
1445    format(Out, '% Snapshot of defined RDF prefixes~n~n', []),
1446    forall(rdf_current_ns(Alias, URI),
1447           format(Out, 'prefix(~q, ~q).~n', [Alias, URI])).
1448
1449%!  load_prefixes(+RDFDBDir) is det.
1450%
1451%   If the file RDFDBDir/prefixes.db exists,  load the prefixes. The
1452%   prefixes are registered using rdf_register_ns/3. Possible errors
1453%   because the prefix  definitions  have   changed  are  printed as
1454%   warnings, retaining the  old  definition.   Note  that  changing
1455%   prefixes generally requires reloading all RDF from the source.
1456
1457load_prefixes(Dir) :-
1458    atomic_list_concat([Dir, /, 'prefixes.db'], PrefixFile),
1459    (   exists_file(PrefixFile)
1460    ->  setup_call_cleanup(open(PrefixFile, read, In, [encoding(utf8)]),
1461                           read_prefixes(In),
1462                           close(In))
1463    ;   true
1464    ).
1465
1466read_prefixes(Stream) :-
1467    read_term(Stream, T0, []),
1468    read_prefixes(T0, Stream).
1469
1470read_prefixes(end_of_file, _) :- !.
1471read_prefixes(prefix(Alias, URI), Stream) :-
1472    !,
1473    must_be(atom, Alias),
1474    must_be(atom, URI),
1475    catch(rdf_register_ns(Alias, URI, []), E,
1476          print_message(warning, E)),
1477    read_term(Stream, T, []),
1478    read_prefixes(T, Stream).
1479read_prefixes(Term, _) :-
1480    domain_error(prefix_term, Term).
1481
1482
1483                 /*******************************
1484                 *              UTIL            *
1485                 *******************************/
1486
1487%!  mkdir(+Directory)
1488%
1489%   Create a directory if it does not already exist.
1490
1491mkdir(Directory) :-
1492    exists_directory(Directory),
1493    !.
1494mkdir(Directory) :-
1495    make_directory(Directory).
1496
1497%!  time_stamp(-Integer)
1498%
1499%   Return time-stamp rounded to integer.
1500
1501time_stamp(Int) :-
1502    get_time(Now),
1503    Int is round(Now).
1504
1505
1506                 /*******************************
1507                 *            MESSAGES          *
1508                 *******************************/
1509
1510:- multifile
1511    prolog:message/3,
1512    prolog:message_context/3.
1513
1514prolog:message(rdf(Term)) -->
1515    message(Term).
1516
1517message(restoring(Type, Count, Jobs)) -->
1518    [ 'Restoring ~D ~w using ~D concurrent workers'-[Count, Type, Jobs] ].
1519message(restore(attached(Graphs, Triples, Time/Wall))) -->
1520    { catch(Percent is round(100*Time/Wall), _, Percent = 0) },
1521    [ 'Loaded ~D graphs (~D triples) in ~2f sec. (~d% CPU = ~2f sec.)'-
1522      [Graphs, Triples, Wall, Percent, Time] ].
1523% attach_graph/2
1524message(restore(true, Action)) -->
1525    !,
1526    silent_message(Action).
1527message(restore(brief, Action)) -->
1528    !,
1529    brief_message(Action).
1530message(restore(_, Graph)) -->
1531    [ 'Restoring ~p ... '-[Graph], flush ].
1532message(restore(_, snapshot(_))) -->
1533    [ at_same_line, '(snapshot) '-[], flush ].
1534message(restore(_, journal(_))) -->
1535    [ at_same_line, '(journal) '-[], flush ].
1536message(restore(_, done(_, Time, Count))) -->
1537    [ at_same_line, '~D triples in ~2f sec.'-[Count, Time] ].
1538% load_source/4
1539message(restore(_, snapshot(G, _))) -->
1540    [ 'Restoring ~p\t(snapshot)'-[G], flush ].
1541message(restore(_, journal(G, _))) -->
1542    [ 'Restoring ~p\t(journal)'-[G], flush ].
1543message(restore(_, done(_, Time, Count))) -->
1544    [ at_same_line, '~D triples in ~2f sec.'-[Count, Time] ].
1545% journal handling
1546message(update_failed(S,P,O,Action)) -->
1547    [ 'Failed to update <~p ~p ~p> with ~p'-[S,P,O,Action] ].
1548% directory reindexing
1549message(reindex(Count, Depth)) -->
1550    [ 'Restructuring database with ~d levels (~D graphs)'-[Depth, Count] ].
1551message(reindex(Depth)) -->
1552    [ 'Fixing database directory structure (~d levels)'-[Depth] ].
1553message(read_only) -->
1554    [ 'Cannot write persistent store; continuing in read-only mode.', nl,
1555      'All changes to the RDF store will be lost if this process terminates.'
1556    ].
1557
1558silent_message(_Action) --> [].
1559
1560brief_message(done(Graph, _Time, _Count, Nth, Total)) -->
1561    { file_base_name(Graph, Base) },
1562    [ at_same_line,
1563      '\r~p~`.t ~D of ~D graphs~72|'-[Base, Nth, Total],
1564      flush
1565    ].
1566brief_message(_) --> [].
1567
1568
1569prolog:message_context(rdf_locked(Args)) -->
1570    { memberchk(time(Time), Args),
1571      memberchk(pid(Pid), Args),
1572      format_time(string(S), '%+', Time)
1573    },
1574    [ nl,
1575      'locked at ~s by process id ~w'-[S,Pid]
1576    ].
1577