1# POPFILE LOADABLE MODULE
2package POPFile::History;
3
4use POPFile::Module;
5@ISA = ("POPFile::Module");
6
7#----------------------------------------------------------------------------
8#
9# This module handles POPFile's history.  It manages entries in the POPFile
10# database and on disk that store messages previously classified by POPFile.
11#
12# Copyright (c) 2001-2011 John Graham-Cumming
13#
14#   This file is part of POPFile
15#
16#   POPFile is free software; you can redistribute it and/or modify it
17#   under the terms of version 2 of the GNU General Public License as
18#   published by the Free Software Foundation.
19#
20#   POPFile is distributed in the hope that it will be useful,
21#   but WITHOUT ANY WARRANTY; without even the implied warranty of
22#   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23#   GNU General Public License for more details.
24#
25#   You should have received a copy of the GNU General Public License
26#   along with POPFile; if not, write to the Free Software
27#   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
28#
29#----------------------------------------------------------------------------
30
31use strict;
32use warnings;
33use locale;
34
35use Date::Parse;
36use Digest::MD5 qw( md5_hex );
37
38my $fields_slot =                                              # PROFILE BLOCK START
39'history.id, hdr_from, hdr_to, hdr_cc, hdr_subject, hdr_date, hash, inserted,
40 buckets.name, usedtobe, history.bucketid, magnets.val, size'; # PROFILE BLOCK STOP
41
42#----------------------------------------------------------------------------
43# new
44#
45#   Class new() function
46#----------------------------------------------------------------------------
47sub new
48{
49    my $proto = shift;
50    my $class = ref($proto) || $proto;
51    my $self = POPFile::Module->new();
52
53    # List of committed history items waiting to be committed
54    # into the database, it consists of lists containing three
55    # elements: the slot id, the bucket classified to and the
56    # magnet if used
57
58    $self->{commit_list__} = ();
59
60    # Contains queries started with start_query and consists
61    # of a mapping between unique IDs and quadruples containing
62    # a reference to the SELECT and a cache of already fetched
63    # rows and a total row count.  These quadruples are implemented
64    # as a sub-hash with keys query, count, cache, fields
65
66    $self->{queries__} = ();
67
68    $self->{firsttime__} = 1;
69
70    # Will contain the database handle retrieved from
71    # Classifier::Bayes
72
73    $self->{db__} = undef;
74
75    $self->{classifier__} = 0;
76
77    bless($self, $class);
78
79    $self->name( 'history' );
80
81    return $self;
82}
83
84#----------------------------------------------------------------------------
85#
86# initialize
87#
88# Called to initialize the history module
89#
90#----------------------------------------------------------------------------
91sub initialize
92{
93    my ( $self ) = @_;
94
95    # Keep the history for two days
96
97    $self->config_( 'history_days', 2 );
98
99    # If 1, Messages are saved to an archive when they are removed or expired
100    # from the history cache
101
102    $self->config_( 'archive', 0 );
103
104    # The directory where messages will be archived to, in sub-directories for
105    # each bucket
106
107    $self->config_( 'archive_dir', 'archive' );
108
109    # This is an advanced setting which will save archived files to a
110    # randomly numbered sub-directory, if set to greater than zero, otherwise
111    # messages will be saved in the bucket directory
112    #
113    # 0 <= directory name < archive_classes
114
115    $self->config_( 'archive_classes', 0 );
116
117    # Need TICKD message for history clean up, COMIT when a message
118    # is committed to the history
119
120    $self->mq_register_( 'TICKD', $self );
121    $self->mq_register_( 'COMIT', $self );
122
123    return 1;
124}
125
126#----------------------------------------------------------------------------
127#
128# stop
129#
130# Called to stop the history module
131#
132#----------------------------------------------------------------------------
133sub stop
134{
135    my ( $self ) = @_;
136
137    # Commit any remaining history items.  This is needed because it's
138    # possible that we get called with a stop after things have been
139    # added to the queue and before service() is called
140
141    $self->commit_history__();
142
143    if ( defined( $self->{db__} ) ) {
144        $self->{db__}->disconnect;
145        $self->{db__} = undef;
146    }
147}
148
149#----------------------------------------------------------------------------
150#
151# db__
152#
153# Since we don't know the order in which the start() methods of PLMs
154# is called we cannot be sure that Classifier::Bayes will have started
155# and connected to the database before us, hence we can't set our
156# database handle at start time.  So instead we access the db handle
157# through this method
158#
159#----------------------------------------------------------------------------
160sub db__
161{
162    my ( $self ) = @_;
163
164    if ( !defined( $self->{db__} ) ) {
165        $self->{db__} = $self->{classifier__}->db()->clone;
166    }
167
168    return $self->{db__};
169}
170
171#----------------------------------------------------------------------------
172#
173# service
174#
175# Called periodically so that the module can do its work
176#
177#----------------------------------------------------------------------------
178sub service
179{
180    my ( $self ) = @_;
181
182    if ( $self->{firsttime__} ) {
183        $self->upgrade_history_files__();
184        $self->{firsttime__} = 0;
185    }
186
187    # Note when we go to multiuser POPFile we'll need to change this call
188    # so that we are sure that the session IDs that it is using are still
189    # valid.  The easiest way will be to call it in deliver() when we get
190    # a COMIT message.
191
192    $self->commit_history__();
193
194    return 1;
195}
196
197#----------------------------------------------------------------------------
198#
199# deliver
200#
201# Called by the message queue to deliver a message
202#
203# There is no return value from this method
204#
205#----------------------------------------------------------------------------
206sub deliver
207{
208    my ( $self, $type, @message ) = @_;
209
210    # If a day has passed then clean up the history
211
212    if ( $type eq 'TICKD' ) {
213        $self->cleanup_history();
214    }
215
216    if ( $type eq 'COMIT' ) {
217        push ( @{$self->{commit_list__}}, \@message );
218    }
219}
220
221# ---------------------------------------------------------------------------
222#
223# forked
224#
225# This is called inside a child process that has just forked, since the
226# child needs access to the database we open it
227#
228# ---------------------------------------------------------------------------
229sub forked
230{
231    my ( $self ) = @_;
232
233    $self->{db__} = undef;
234}
235
236#----------------------------------------------------------------------------
237#
238# ADDING TO THE HISTORY
239#
240# To add a message to the history the following sequence of calls
241# is made:
242#
243# 1. Obtain a unique ID and filename for the new message by a call
244#    to reserve_slot
245#
246# 2. Write the message into the filename returned
247#
248# 3. Call commit_slot with the bucket into which the message was
249#    classified
250#
251# If an error occurs after #1 and the slot is unneeded then call
252# release_slot
253#
254#----------------------------------------------------------------------------
255#
256# FINDING A HISTORY ENTRY
257#
258# 1. If you know the slot id then call get_slot_file to obtain
259#    the full path where the file is stored
260#
261# 2. If you know the message hash then call get_slot_from hash
262#    to get the slot id
263#
264# 3. If you know the message headers then use get_message_hash
265#    to get the hash
266#
267#----------------------------------------------------------------------------
268
269#----------------------------------------------------------------------------
270#
271# reserve_slot
272#
273# Called to reserve a place in the history for a message that is in the
274# process of being received.  It returns a unique ID for this slot and
275# the full path to the file where the message should be stored.  The
276# caller is expected to later call either release_slot (if the slot is not
277# going to be used) or commit_slot (if the file has been written and the
278# entry should be added to the history).
279#
280# The only parameter is optional and exists for the sake of the test-
281# suite: you can pass in the time at which the message was inserted,
282# ie. the time at which the message arrived.
283#----------------------------------------------------------------------------
284sub reserve_slot
285{
286    my $self = shift;
287    my $inserted_time = shift || time;
288
289    my $insert_sth = $self->db__()->prepare(                          # PROFILE BLOCK START
290            'insert into history ( userid, committed, inserted )
291                         values  (      ?,         ?,        ? );' ); # PROFILE BLOCK STOP
292    my $is_sqlite2 = ( $self->db__()->{Driver}->{Name} =~ /SQLite2?/ ) &&  # PROFILE BLOCK START
293                     ( $self->db__()->{sqlite_version} =~ /^2\./ );        # PROFILE BLOCK STOP
294
295    my $slot;
296
297    while ( !defined($slot) || $slot == 0 ) {
298        my $r = int(rand( 1000000000 )+2);
299
300        $self->log_( 2, "reserve_slot selected random number $r" );
301
302        # Get the date/time now which will be stored in the database
303        # so that we can sort on the Date: header in the message and
304        # when we received it
305
306        my $result = $insert_sth->execute( 1, $r, $inserted_time );
307        next if ( !defined( $result ) );
308
309        if ( $is_sqlite2 ) {
310            $slot = $self->db__()->func( 'last_insert_rowid' );
311        } else {
312            $slot = $self->db__()->last_insert_id( undef, undef, 'history', 'id' );
313        }
314    }
315
316    $insert_sth->finish;
317
318    $self->log_( 2, "reserve_slot returning slot id $slot" );
319
320    return ( $slot, $self->get_slot_file( $slot ) );
321}
322
323#----------------------------------------------------------------------------
324#
325# release_slot
326#
327# See description with reserve_slot; release_slot releases a history slot
328# previously allocated with reserve_slot and discards it.
329#
330# id              Unique ID returned by reserve_slot
331#
332#----------------------------------------------------------------------------
333sub release_slot
334{
335    my ( $self, $slot ) = @_;
336
337    # Remove the entry from the database and delete the file
338    # if present
339
340    my $delete = 'delete from history where history.id = ?;';
341
342    my $h = $self->db__()->prepare( $delete );
343    $h->execute( $slot );
344
345    my $file = $self->get_slot_file( $slot );
346
347    unlink $file;
348
349    # It's now possible that the directory for the slot file is empty
350    # and we want to delete it so that things get cleaned up
351    # automatically
352
353    my $directory = $file;
354    $directory =~ s/popfile[a-f0-9]{2}\.msg$//i;
355
356    my $depth = 3;
357
358    while ( $depth > 0 ) {
359        if ( rmdir( $directory ) ) {
360            $directory =~ s![a-f0-9]{2}/$!!i;
361            $depth--;
362        }
363        else {
364            # We either aren't allowed to delete the
365            # directory or it wasn't empty
366            last;
367        }
368    }
369}
370
371#----------------------------------------------------------------------------
372#
373# commit_slot
374#
375# See description with reserve_slot; commit_slot commits a history
376# slot to the database and makes it part of the history.  Before this
377# is called the full message should have been written to the file
378# returned by reserve_slot.  Note that commit_slot queues the message
379# for insertion and does not commit it until some (short) time later
380#
381# session         User session with Classifier::Bayes API
382# slot            Unique ID returned by reserve_slot
383# bucket          Bucket classified to
384# magnet          Magnet if used
385#
386#----------------------------------------------------------------------------
387sub commit_slot
388{
389    my ( $self, $session, $slot, $bucket, $magnet ) = @_;
390
391    $self->mq_post_( 'COMIT', $session, $slot, $bucket, $magnet );
392}
393
394#----------------------------------------------------------------------------
395#
396# change_slot_classification
397#
398# Used to 'reclassify' a message by changing its classification in the
399# database.
400#
401# slot         The slot to update
402# class        The new classification
403# session      A valid API session
404# undo         If set to 1 then indicates an undo operation
405#
406#----------------------------------------------------------------------------
407sub change_slot_classification
408{
409    my ( $self, $slot, $class, $session, $undo ) = @_;
410
411    $self->log_( 0, "Change slot classification of $slot to $class" );
412
413    # Get the bucket ID associated with the new classification
414    # then retrieve the current classification for this slot
415    # and update the database
416
417    my $bucketid = $self->{classifier__}->get_bucket_id(  # PROFILE BLOCK START
418                           $session, $class );            # PROFILE BLOCK STOP
419
420    my $oldbucketid = 0;
421    if ( !$undo ) {
422        my @fields = $self->get_slot_fields( $slot );
423        $oldbucketid = $fields[10];
424    }
425
426    my $h = $self->db__()->prepare(  # PROFILE BLOCK START
427        'update history set bucketid = ?,
428                            usedtobe = ?
429                where id = ?;' );    # PROFILE BLOCK STOP
430    $h->execute( $bucketid, $oldbucketid, $slot );
431    $self->force_requery();
432}
433
434#----------------------------------------------------------------------------
435#
436# revert_slot_classification
437#
438# Used to undo a 'reclassify' a message by changing its classification
439# in the database.
440#
441# slot         The slot to update
442#
443#----------------------------------------------------------------------------
444sub revert_slot_classification
445{
446    my ( $self, $slot ) = @_;
447
448    my @fields = $self->get_slot_fields( $slot );
449    my $oldbucketid = $fields[9];
450
451    my $h = $self->db__()->prepare(  # PROFILE BLOCK START
452        'update history set bucketid = ?,
453                            usedtobe = ?
454                where id = ?;' );    # PROFILE BLOCK STOP
455    $h->execute( $oldbucketid, 0, $slot );
456    $self->force_requery();
457}
458
459#---------------------------------------------------------------------------
460#
461# get_slot_fields
462#
463# Returns the fields associated with a specific slot.  We return the
464# same collection of fields as get_query_rows.
465#
466# slot           The slot id
467#
468#---------------------------------------------------------------------------
469sub get_slot_fields
470{
471    my ( $self, $slot ) = @_;
472
473    return undef if ( !defined( $slot ) || $slot !~ /^\d+$/ );
474
475    my $h = $self->db__()->prepare(           # PROFILE BLOCK START
476        "select $fields_slot from history, buckets, magnets
477             where history.id        = ? and
478                   buckets.id        = history.bucketid and
479                   magnets.id        = magnetid and
480                   history.committed = 1;" ); # PROFILE BLOCK STOP
481    $h->execute( $slot );
482    my @result = $h->fetchrow_array;
483    return @result;
484}
485
486#---------------------------------------------------------------------------
487#
488# is_valid_slot
489#
490# Returns 1 if the slot ID passed in is valid
491#
492# slot           The slot id
493#
494#---------------------------------------------------------------------------
495sub is_valid_slot
496{
497    my ( $self, $slot ) = @_;
498
499    return undef if ( !defined( $slot ) || $slot !~ /^\d+$/ );
500
501    my $h = $self->db__()->prepare(           # PROFILE BLOCK START
502        'select id from history
503             where history.id        = ? and
504                   history.committed = 1;' ); # PROFILE BLOCK STOP
505    $h->execute( $slot );
506    my @row = $h->fetchrow_array;
507
508    return ( ( @row ) && ( $row[0] == $slot ) );
509}
510
511#---------------------------------------------------------------------------
512#
513# commit_history__
514#
515# (private) Used internally to commit messages that have been committed
516# with a call to commit_slot to the database
517#
518#----------------------------------------------------------------------------
519sub commit_history__
520{
521    my ( $self ) = @_;
522
523    if ( $#{$self->{commit_list__}} == -1 ) {
524        return;
525    }
526
527    my $update_history = $self->db__()->prepare(         # PROFILE BLOCK START
528                'update history set hdr_from    = ?,
529                                    hdr_to      = ?,
530                                    hdr_date    = ?,
531                                    hdr_cc      = ?,
532                                    hdr_subject = ?,
533                                    sort_from   = ?,
534                                    sort_to     = ?,
535                                    sort_cc     = ?,
536                                    committed   = ?,
537                                    bucketid    = ?,
538                                    usedtobe    = ?,
539                                    magnetid    = ?,
540                                    hash        = ?,
541                                    size        = ?
542                                    where id    = ?;' ); # PROFILE BLOCK STOP
543
544    $self->db__()->begin_work;
545    foreach my $entry (@{$self->{commit_list__}}) {
546        my ( $session, $slot, $bucket, $magnet ) = @{$entry};
547
548        my $file = $self->get_slot_file( $slot );
549
550        # Committing to the history requires the following steps
551        #
552        # 1. Parse the message to extract the headers
553        # 2. Compute MD5 hash of Message-ID, Date and Subject
554        # 3. Update the related row with the headers and
555        #    committed set to 1
556
557        my %header;
558
559        if ( open FILE, "<$file" ) {
560            my $last;
561            while ( <FILE> ) {
562                s/[\r\n]//g;
563
564                if ( /^$/ ) {
565                    last;
566                }
567
568                if ( /^([^ \t]+):[ \t]*(.*)$/ ) {
569                    $last = lc $1;
570                    push @{$header{$last}}, $2;
571
572                } else {
573                    if ( defined $last ) {
574                        ${$header{$last}}[$#{$header{$last}}] .= $_;
575                    }
576                }
577            }
578            close FILE;
579        }
580        else {
581            $self->log_( 0, "Could not open history message file $file for reading." );
582        }
583
584        my $hash = $self->get_message_hash( ${$header{'message-id'}}[0],
585                                            ${$header{'date'}}[0],
586                                            ${$header{'subject'}}[0],
587                                            ${$header{'received'}}[0] );
588
589        # For sorting purposes the From, To and CC headers have special
590        # cleaned up versions of themselves in the database.  The idea
591        # is that case and certain characters should be ignored when
592        # sorting these fields
593        #
594        # "John Graham-Cumming" <spam@jgc.org> maps to
595        #     john graham-cumming spam@jgc.org
596
597        my @sortable = ( 'from', 'to', 'cc' );
598        my %sort_headers;
599
600        foreach my $h (@sortable) {
601            $sort_headers{$h} =           # PROFILE BLOCK START
602                 $self->{classifier__}->{parser__}->decode_string(
603                     ${$header{$h}}[0] ); # PROFILE BLOCK STOP
604            $sort_headers{$h} = lc($sort_headers{$h} || '');
605            $sort_headers{$h} =~ s/[\"<>]//g;
606            $sort_headers{$h} =~ s/^[ \t]+//g;
607            $sort_headers{$h} =~ s/\0//g;
608        }
609
610        # Make sure that the headers we are going to insert into
611        # the database have been defined and are suitably quoted
612
613        my @required = ( 'from', 'to', 'cc', 'subject' );
614
615        foreach my $h (@required) {
616            ${$header{$h}}[0] =           # PROFILE BLOCK START
617                 $self->{classifier__}->{parser__}->decode_string(
618                     ${$header{$h}}[0] ); # PROFILE BLOCK STOP
619
620            if ( !defined ${$header{$h}}[0] || ${$header{$h}}[0] =~ /^\s*$/ ) {
621                if ( $h ne 'cc' ) {
622                    ${$header{$h}}[0] = "<$h header missing>";
623                } else {
624                    ${$header{$h}}[0] = '';
625                }
626            }
627
628            ${$header{$h}}[0] =~ s/\0//g;
629        }
630
631        # If we do not have a date header then set the date to
632        # 0 (start of the Unix epoch), otherwise parse the string
633        # using Date::Parse to interpret it and turn it into the
634        # Unix epoch.
635
636        if ( !defined( ${$header{date}}[0] ) ) {
637            ${$header{date}}[0] = 0;
638        } else {
639            ${$header{date}}[0] = str2time( ${$header{date}}[0] ) || 0;
640        }
641
642        # Figure out the ID of the bucket this message has been
643        # classified into (and the same for the magnet if it is
644        # defined)
645
646        my $bucketid = $self->{classifier__}->get_bucket_id(  # PROFILE BLOCK START
647                           $session, $bucket );               # PROFILE BLOCK STOP
648
649        my $msg_size = -s $file;
650
651        # If we can't get the bucket ID because the bucket doesn't exist
652        # which could happen when we are upgrading the history which
653        # has old bucket names in it then we will remove the entry from the
654        # history and log the failure
655
656        if ( defined( $bucketid ) ) {
657            my $result = $update_history->execute(  # PROFILE BLOCK START
658                    ${$header{from}}[0],    # hdr_from
659                    ${$header{to}}[0],      # hdr_to
660                    ${$header{date}}[0],    # hdr_date
661                    ${$header{cc}}[0],      # hdr_cc
662                    ${$header{subject}}[0], # hdr_subject
663                    $sort_headers{from},    # sort_from
664                    $sort_headers{to},      # sort_to
665                    $sort_headers{cc},      # sort_cc
666                    1,                      # committed
667                    $bucketid,              # bucketid
668                    0,                      # usedtobe
669                    $magnet,                # magnetid
670                    $hash,                  # hash
671                    $msg_size,              # size
672                    $slot                   # id
673            );                                      # PROFILE BLOCK STOP
674        } else {
675            $self->log_( 0, "Couldn't find bucket ID for bucket $bucket when committing $slot" );
676            $self->release_slot( $slot );
677        }
678    }
679    $self->db__()->commit;
680    $update_history->finish;
681
682    $self->{commit_list__} = ();
683    $self->force_requery();
684}
685
686# ---------------------------------------------------------------------------
687#
688# delete_slot
689#
690# Deletes an entry from the database and disk, optionally archiving it
691# if the archive parameters have been set
692#
693# $slot              The slot ID
694# $archive           1 if it's OK to archive this entry
695#
696# ---------------------------------------------------------------------------
697sub delete_slot
698{
699    my ( $self, $slot, $archive ) = @_;
700
701    my $file = $self->get_slot_file( $slot );
702    $self->log_( 2, "delete_slot called for slot $slot, file $file" );
703
704    if ( $archive && $self->config_( 'archive' ) ) {
705        my $path = $self->get_user_path_( $self->config_( 'archive_dir' ), 0 );
706
707        $self->make_directory__( $path );
708
709        my $b = $self->db__()->selectrow_arrayref(
710            "select buckets.name from history, buckets
711                 where history.bucketid = buckets.id and
712                       history.id = $slot;" );
713
714        my $bucket = $b->[0];
715
716        if ( ( $bucket ne 'unclassified' ) &&   # PROFILE BLOCK START
717             ( $bucket ne 'unknown class' ) ) { # PROFILE BLOCK STOP
718            $path .= "\/" . $bucket;
719            $self->make_directory__( $path );
720
721            if ( $self->config_( 'archive_classes' ) > 0) {
722
723                # Archive to a random sub-directory of the bucket archive
724
725                my $subdirectory = int( rand(                # PROFILE BLOCK START
726                    $self->config_( 'archive_classes' ) ) ); # PROFILE BLOCK STOP
727                $path .= "\/" . $subdirectory;
728                $self->make_directory__( $path );
729            }
730
731            # Previous comment about this potentially being unsafe
732            # (may have placed messages in unusual places, or
733            # overwritten files) no longer applies. Files are now
734            # placed in the user directory, in the archive_dir
735            # subdirectory
736
737            $self->copy_file__( $file, $path, "popfile$slot.msg" );
738        }
739    }
740
741    # Now remove the entry from the database, and the file from disk,
742    # and also invalidate the caches of any open queries since they
743    # may have been affected
744
745    $self->release_slot( $slot );
746    $self->force_requery();
747}
748
749#----------------------------------------------------------------------------
750#
751# start_deleting
752#
753# Called before doing a block of calls to delete_slot.  This will call
754# back into the Classifier::Bayes to tweak the database performance to
755# make this quick.
756#
757#----------------------------------------------------------------------------
758sub start_deleting
759{
760    my ( $self ) = @_;
761
762#    $self->{classifier__}->tweak_sqlite( 1, 1, $self->db__() );
763    $self->db__()->begin_work;
764}
765
766#----------------------------------------------------------------------------
767#
768# stop_deleting
769#
770# Called after doing a block of calls to delete_slot.  This will call
771# back into the Classifier::Bayes to untweak the database performance.
772#
773#----------------------------------------------------------------------------
774sub stop_deleting
775{
776    my ( $self ) = @_;
777
778    $self->db__()->commit;
779#    $self->{classifier__}->tweak_sqlite( 1, 0, $self->db__() );
780}
781
782#----------------------------------------------------------------------------
783#
784# get_slot_file
785#
786# Used to map a slot ID to the full path of the file will contain
787# the message associated with the slot
788#
789#----------------------------------------------------------------------------
790sub get_slot_file
791{
792    my ( $self, $slot ) = @_;
793
794    # The mapping between the slot and the file goes as follows:
795    #
796    # 1. Convert the file to an 8 digit hex number (with leading
797    #    zeroes).
798    # 2. Call that number aabbccdd
799    # 3. Build the path aa/bb/cc
800    # 4. Name the file popfiledd.msg
801    # 5. Add the msgdir location to obtain
802    #        msgdir/aa/bb/cc/popfiledd.msg
803    #
804    # Hence each directory can have up to 256 entries
805
806    my $hex_slot = sprintf( '%8.8x', $slot );
807    my $path = $self->get_user_path_(                        # PROFILE BLOCK START
808                   $self->global_config_( 'msgdir' ) .
809                       substr( $hex_slot, 0, 2 ) . '/', 0 ); # PROFILE BLOCK STOP
810
811    $self->make_directory__( $path );
812    $path .= substr( $hex_slot, 2, 2 ) . '/';
813    $self->make_directory__( $path );
814    $path .= substr( $hex_slot, 4, 2 ) . '/';
815    $self->make_directory__( $path );
816
817    my $file = 'popfile' .                         # PROFILE BLOCK START
818               substr( $hex_slot, 6, 2 ) . '.msg'; # PROFILE BLOCK STOP
819
820    return $path . $file;
821}
822
823#----------------------------------------------------------------------------
824#
825# get_message_hash
826#
827# Used to compute an MD5 hash of the headers of a message
828# so that the same message can later me identified by a
829# call to get_slot_from_hash
830#
831# messageid              The message id header
832# date                   The date header
833# subject                The subject header
834# received               First Received header line
835#
836# Note that the values passed in are everything after the : in
837# header without the trailing \r or \n.  If a header is missing
838# then pass in the empty string
839#
840#----------------------------------------------------------------------------
841sub get_message_hash
842{
843    my ( $self, $messageid, $date, $subject, $received ) = @_;
844
845    $messageid = '' if ( !defined( $messageid ) );
846    $date      = '' if ( !defined( $date      ) );
847    $subject   = '' if ( !defined( $subject   ) );
848    $received  = '' if ( !defined( $received  ) );
849
850    return md5_hex( "[$messageid][$date][$subject][$received]" );
851}
852
853#----------------------------------------------------------------------------
854#
855# get_slot_from_hash
856#
857# Given a hash value (returned by get_message_hash), find any
858# corresponding message in the database and return its slot
859# id.   If the message does not exist then return the empty
860# string.
861#
862# hash                 The hash value
863#
864#----------------------------------------------------------------------------
865sub get_slot_from_hash
866{
867    my ( $self, $hash ) = @_;
868
869    my $h = $self->db__()->prepare(                         # PROFILE BLOCK START
870        'select id from history where hash = ? limit 1;' ); # PROFILE BLOCK STOP
871    $h->execute( $hash );
872    my $result = $h->fetchrow_arrayref;
873
874    return defined( $result )?$result->[0]:'';
875}
876
877#----------------------------------------------------------------------------
878#
879# QUERYING THE HISTORY
880#
881# 1. Start a query session by calling start_query and obtain a unique
882#    ID
883#
884# 2. Set the query parameter (i.e. sort, search and filter) with a call
885#    to set_query
886#
887# 3. Obtain the number of history rows returned by calling get_query_size
888#
889# 4. Get segments of the history returned by calling get_query_rows with
890#    the start and end rows needed
891#
892# 5. When finished with the query call stop_query
893#
894#----------------------------------------------------------------------------
895
896#----------------------------------------------------------------------------
897#
898# start_query
899#
900# Used to start a query session, returns a unique ID for this
901# query.  When the caller is done with the query they return
902# stop_query.
903#
904#----------------------------------------------------------------------------
905sub start_query
906{
907    my ( $self ) = @_;
908
909    # Think of a large random number, make sure that it hasn't
910    # been used and then return it
911
912    while (1) {
913        my $id = sprintf( '%8.8x', int(rand(4294967295)) );
914
915        if ( !defined( $self->{queries__}{$id} ) ) {
916            $self->{queries__}{$id}{query} = 0;
917            $self->{queries__}{$id}{count} = 0;
918            $self->{queries__}{$id}{cache} = ();
919            return $id
920        }
921    }
922}
923
924#----------------------------------------------------------------------------
925#
926# stop_query
927#
928# Used to clean up after a query session
929#
930# id                The ID returned by start_query
931#
932#----------------------------------------------------------------------------
933sub stop_query
934{
935    my ( $self, $id ) = @_;
936
937    # If the cache size hasn't grown to the row
938    # count then we didn't fetch everything and so
939    # we fill call finish to clean up
940
941    my $q = $self->{queries__}{$id}{query};
942
943    if ( ( defined $q ) && ( $q != 0 ) ) {
944        if ( $#{$self->{queries__}{$id}{cache}} !=  # PROFILE BLOCK START
945             $self->{queries__}{$id}{count} ) {     # PROFILE BLOCK STOP
946            $q->finish;
947            undef $self->{queries__}{$id}{query};
948        }
949    }
950
951    delete $self->{queries__}{$id};
952}
953
954#----------------------------------------------------------------------------
955#
956# set_query
957#
958# Called to set up a query with sort, filter and search options
959#
960# id            The ID returned by start_query
961# filter        Name of bucket to filter on
962# search        From/Subject line to search for
963# sort          The field to sort on (from, subject, to, cc, bucket, date)
964#               (optional leading - for descending sort)
965# not           If set to 1 negates the search
966#
967#----------------------------------------------------------------------------
968sub set_query
969{
970    my ( $self, $id, $filter, $search, $sort, $not ) = @_;
971
972    $search =~ s/\0//g;
973    $sort = '' if ( $sort !~ /^(\-)?(inserted|from|to|cc|subject|bucket|date|size)$/ );
974
975    # If this query has already been done and is in the cache
976    # then do no work here
977
978    if ( defined( $self->{queries__}{$id}{fields} ) &&  # PROFILE BLOCK START
979         ( $self->{queries__}{$id}{fields} eq
980             "$filter:$search:$sort:$not" ) ) {         # PROFILE BLOCK STOP
981        return;
982    }
983
984    $self->{queries__}{$id}{fields} = "$filter:$search:$sort:$not";
985
986    # We do two queries, the first to get the total number of rows that
987    # would be returned and then we start the real query.  This is done
988    # so that we know the size of the resulting data without having
989    # to retrieve it all
990
991    $self->{queries__}{$id}{base} =                          # PROFILE BLOCK START
992        'select XXX from history, buckets, magnets
993                where history.userid = 1 and committed = 1'; # PROFILE BLOCK STOP
994
995    $self->{queries__}{$id}{base} .= ' and history.bucketid = buckets.id';
996    $self->{queries__}{$id}{base} .= ' and magnets.id = magnetid';
997
998    # If there's a search portion then add the appropriate clause
999    # to find the from/subject header
1000
1001    my $not_word  = $not ? 'not' : '';
1002    my $not_equal = $not ? '!='  : '=';
1003    my $equal     = $not ? '='   : '!=';
1004
1005    if ( $search ne '' ) {
1006        $search = $self->db__()->quote( '%' . $search . '%' );
1007        $self->{queries__}{$id}{base} .= " and $not_word ( hdr_from like $search or hdr_subject like $search )";
1008    }
1009
1010    # If there's a filter option then we'll need to get the bucket
1011    # id for the filtered bucket and add the appropriate clause
1012
1013    if ( $filter ne '' ) {
1014        if ( $filter eq '__filter__magnet' ) {
1015            $self->{queries__}{$id}{base} .=      # PROFILE BLOCK START
1016                " and history.magnetid $equal 0"; # PROFILE BLOCK STOP
1017        } else {
1018            if ( $filter eq '__filter__reclassified' ) {
1019                $self->{queries__}{$id}{base} .=      # PROFILE BLOCK START
1020                    " and history.usedtobe $equal 0"; # PROFILE BLOCK STOP
1021            } else {
1022                my $bucket = $self->db__()->quote( $filter );
1023                $self->{queries__}{$id}{base} .=
1024                        " and buckets.name $not_equal $bucket";
1025            }
1026        }
1027    }
1028
1029    # Add the sort option (if there is one)
1030
1031    if ( $sort ne '' ) {
1032        $sort =~ s/^(\-)//;
1033        my $direction = defined($1)?'desc':'asc';
1034        if ( $sort eq 'bucket' ) {
1035            $sort = 'buckets.name';
1036        } else {
1037            if ( $sort =~ /from|to|cc/ ) {
1038                $sort = "sort_$sort";
1039            } else {
1040                if ( $sort ne 'inserted' && $sort ne 'size' ) {
1041                    $sort = "hdr_$sort";
1042                }
1043            }
1044        }
1045        $self->{queries__}{$id}{base} .= " order by $sort $direction;";
1046    } else {
1047        $self->{queries__}{$id}{base} .= ' order by inserted desc;';
1048    }
1049
1050    my $count = $self->{queries__}{$id}{base};
1051    $self->log_( 2, "Base query is $count" );
1052    $count =~ s/XXX/COUNT(*)/;
1053
1054    my $h = $self->db__()->prepare( $count );
1055    $h->execute;
1056    $self->{queries__}{$id}{count} = $h->fetchrow_arrayref->[0];
1057    $h->finish;
1058
1059    my $select = $self->{queries__}{$id}{base};
1060    $select =~ s/XXX/$fields_slot/;
1061    $self->{queries__}{$id}{query} = $self->db__()->prepare( $select );
1062    $self->{queries__}{$id}{cache} = ();
1063}
1064
1065#----------------------------------------------------------------------------
1066#
1067# delete_query
1068#
1069# Called to delete all the rows returned in a query
1070#
1071# id            The ID returned by start_query
1072#
1073#----------------------------------------------------------------------------
1074sub delete_query
1075{
1076    my ( $self, $id ) = @_;
1077
1078    $self->start_deleting();
1079
1080    my $delete = $self->{queries__}{$id}{base};
1081    $delete =~ s/XXX/history.id/;
1082    my $d = $self->db__()->prepare( $delete );
1083    $d->execute;
1084    my $history_id;
1085    my @row;
1086    my @ids;
1087    $d->bind_columns( \$history_id );
1088    while ( $d->fetchrow_arrayref ) {
1089        push ( @ids, $history_id );
1090    }
1091    foreach my $id (@ids) {
1092        $self->delete_slot( $id, 1 );
1093    }
1094
1095    $self->stop_deleting();
1096}
1097
1098#----------------------------------------------------------------------------
1099#
1100# get_query_size
1101#
1102# Called to return the number of elements in the query.
1103# Should only be called after a call to set_query.
1104#
1105# id            The ID returned by start_query
1106#
1107#----------------------------------------------------------------------------
1108sub get_query_size
1109{
1110    my ( $self, $id ) = @_;
1111
1112    return $self->{queries__}{$id}{count};
1113}
1114
1115#----------------------------------------------------------------------------
1116#
1117# get_query_rows
1118#
1119# Returns the rows in the range [$start, $end) from a query that has
1120# already been set up with a call to set_query.  The first row is row 1.
1121#
1122# id            The ID returned by start_query
1123# start         The first row to return
1124# count         Number of rows to return
1125#
1126# Each row contains the fields:
1127#
1128#    id (0), from (1), to (2), cc (3), subject (4), date (5), hash (6),
1129#    inserted date (7), bucket name (8), reclassified id (9), bucket id (10),
1130#    magnet value (11), size (12)
1131#----------------------------------------------------------------------------
1132sub get_query_rows
1133{
1134    my ( $self, $id, $start, $count ) = @_;
1135
1136    # First see if we have already retrieved these rows from the query
1137    # if we have then we can just return them from the cache.  Otherwise
1138    # fetch the rows from the database and then return them
1139
1140    my $size = $#{$self->{queries__}{$id}{cache}}+1;
1141
1142    $self->log_( 2, "Request for rows $start ($count), current size $size" );
1143
1144    if ( ( $size < ( $start + $count - 1 ) ) ) {
1145        my $rows = $start + $count - $size;
1146        $self->log_( 2, "Getting $rows rows from database" );
1147        $self->{queries__}{$id}{query}->execute;
1148        $self->{queries__}{$id}{cache} =      # PROFILE BLOCK START
1149            $self->{queries__}{$id}{query}->fetchall_arrayref(
1150                undef, $start + $count - 1 ); # PROFILE BLOCK STOP
1151        $self->{queries__}{$id}{query}->finish;
1152    }
1153
1154    my ( $from, $to ) = ( $start-1, $start+$count-2 );
1155
1156    $self->log_( 2, "Returning $from..$to" );
1157
1158    return @{$self->{queries__}{$id}{cache}}[$from..$to];
1159}
1160
1161# ---------------------------------------------------------------------------
1162#
1163# make_directory__
1164#
1165# Wrapper for mkdir that ensures that the path we are making doesn't end in
1166# / or \ (Done because your can't do mkdir 'foo/' on NextStep.
1167#
1168# $path        The directory to make
1169#
1170# Returns whatever mkdir returns
1171#
1172# ---------------------------------------------------------------------------
1173sub make_directory__
1174{
1175    my ( $self, $path ) = @_;
1176
1177    $path =~ s/[\\\/]$//;
1178
1179    return 1 if ( -d $path );
1180    return mkdir( $path );
1181}
1182
1183# ---------------------------------------------------------------------------
1184#
1185# compare_mf__
1186#
1187# Compares two mailfiles, used for sorting mail into order
1188#
1189# ---------------------------------------------------------------------------
1190sub compare_mf__
1191{
1192    $a =~ /popfile(\d+)=(\d+)\.msg/;
1193    my ( $ad, $am ) = ( $1, $2 );
1194
1195    $b =~ /popfile(\d+)=(\d+)\.msg/;
1196    my ( $bd, $bm ) = ( $1, $2 );
1197
1198    if ( $ad == $bd ) {
1199        return ( $bm <=> $am );
1200    } else {
1201        return ( $bd <=> $ad );
1202    }
1203}
1204
1205# ---------------------------------------------------------------------------
1206#
1207# upgrade_history_files__
1208#
1209# Looks for old .MSG/.CLS history entries and sticks them in the database
1210#
1211# ---------------------------------------------------------------------------
1212sub upgrade_history_files__
1213{
1214    my ( $self ) = @_;
1215
1216    # See if there are any .MSG files in the msgdir, and if there are
1217    # upgrade them by placing them in the database
1218
1219    my @msgs = sort compare_mf__ glob $self->get_user_path_(     # PROFILE BLOCK START
1220        $self->global_config_( 'msgdir' ) . 'popfile*.msg', 0 ); # PROFILE BLOCK STOP
1221
1222    if ( $#msgs != -1 ) {
1223        my $session = $self->{classifier__}->get_session_key( 'admin', '' );
1224
1225        print "\nFound old history files, moving them into database\n    ";
1226
1227        my $i = 0;
1228        $self->db__()->begin_work;
1229        foreach my $msg (@msgs) {
1230            if ( ( ++$i % 100 ) == 0 ) {
1231                print "[$i]";
1232                flush STDOUT;
1233            }
1234
1235            # NOTE.  We drop the information in $usedtobe, so that
1236            # reclassified messages will no longer appear reclassified
1237            # in upgraded history.  Also the $magnet is ignored so
1238            # upgraded history will have no magnet information.
1239
1240            my ( $reclassified, $bucket, $usedtobe, $magnet ) =  # PROFILE BLOCK START
1241                $self->history_read_class__( $msg );             # PROFILE BLOCK STOP
1242
1243            if ( $bucket ne 'unknown_class' ) {
1244                my ( $slot, $file ) = $self->reserve_slot();
1245                rename $msg, $file;
1246                my @message = ( $session, $slot, $bucket, 0 );
1247                push ( @{$self->{commit_list__}}, \@message );
1248            }
1249        }
1250        $self->db__()->commit;
1251
1252        print "\nDone upgrading history\n";
1253
1254        $self->commit_history__();
1255        $self->{classifier__}->release_session_key( $session );
1256
1257        unlink $self->get_user_path_(                                 # PROFILE BLOCK START
1258            $self->global_config_( 'msgdir' ) . 'history_cache', 0 ); # PROFILE BLOCK STOP
1259    }
1260}
1261
1262# ---------------------------------------------------------------------------
1263#
1264# history_read_class__ - load and delete the class file for a message.
1265#
1266# returns: ( reclassified, bucket, usedtobe, magnet )
1267#   values:
1268#       reclassified:   boolean, true if message has been reclassified
1269#       bucket:         string, the bucket the message is in presently,
1270#                       unknown class if an error occurs
1271#       usedtobe:       string, the bucket the message used to be in
1272#                       (null if not reclassified)
1273#       magnet:         string, the magnet
1274#
1275# $filename     The name of the message to load the class for
1276#
1277# ---------------------------------------------------------------------------
1278sub history_read_class__
1279{
1280    my ( $self, $filename ) = @_;
1281
1282    $filename =~ s/msg$/cls/;
1283
1284    my $reclassified = 0;
1285    my $bucket = 'unknown class';
1286    my $usedtobe;
1287    my $magnet = '';
1288
1289    if ( open CLASS, "<$filename" ) {
1290        $bucket = <CLASS>;
1291        if ( defined( $bucket ) &&                        # PROFILE BLOCK START
1292           ( $bucket =~ /([^ ]+) MAGNET ([^\r\n]+)/ ) ) { # PROFILE BLOCK STOP
1293            $bucket = $1;
1294            $magnet = $2;
1295        }
1296
1297        $reclassified = 0;
1298        if ( defined( $bucket ) && ( $bucket =~ /RECLASSIFIED/ ) ) {
1299            $bucket       = <CLASS>;
1300            $usedtobe = <CLASS>;
1301            $reclassified = 1;
1302            $usedtobe =~ s/[\r\n]//g;
1303        }
1304        close CLASS;
1305        $bucket =~ s/[\r\n]//g if defined( $bucket );
1306        unlink $filename;
1307    } else {
1308        return ( undef, $bucket, undef, undef );
1309    }
1310
1311    $bucket = 'unknown class' if ( !defined( $bucket ) );
1312
1313    return ( $reclassified, $bucket, $usedtobe, $magnet );
1314}
1315
1316#----------------------------------------------------------------------------
1317#
1318# cleanup_history
1319#
1320# Removes the popfile*.msg files that are older than a number of days
1321# configured as history_days.
1322#
1323#----------------------------------------------------------------------------
1324sub cleanup_history
1325{
1326    my ( $self ) = @_;
1327
1328    my $seconds_per_day = 24 * 60 * 60;
1329    my $old = time - $self->config_( 'history_days' ) * $seconds_per_day;
1330    my @ids;
1331    my $d = $self->db__()->prepare(     # PROFILE BLOCK START
1332        'select id from history
1333                where inserted < ?;' ); # PROFILE BLOCK STOP
1334    $d->execute( $old );
1335    my $id;
1336    $d->bind_columns( \$id );
1337    while ( $d->fetchrow_arrayref ) {
1338        push ( @ids, $id );
1339    }
1340    $d->finish;
1341    foreach my $id (@ids) {
1342        $self->delete_slot( $id, 1 );
1343    }
1344}
1345
1346# ---------------------------------------------------------------------------
1347#
1348# copy_file__
1349#
1350# Utility to copy a file and ensure that the path it is going to
1351# exists
1352#
1353# $from               Where to copy from
1354# $to_dir             The directory it will be copied to
1355# $to_name            The name of the destination (without the directory)
1356#
1357# ---------------------------------------------------------------------------
1358sub copy_file__
1359{
1360    my ( $self, $from, $to_dir, $to_name ) = @_;
1361
1362    if ( open( FROM, "<$from") ) {
1363        if ( open( TO, ">$to_dir\/$to_name") ) {
1364            binmode FROM;
1365            binmode TO;
1366            while (<FROM>) {
1367                print TO $_;
1368            }
1369            close TO;
1370        }
1371
1372        close FROM;
1373    }
1374}
1375
1376# ---------------------------------------------------------------------------
1377#
1378# force_requery
1379#
1380# Called when the database has changed to invalidate any queries that are
1381# open so that cached data is not returned and the database is requeried
1382#
1383# ---------------------------------------------------------------------------
1384sub force_requery
1385{
1386    my ( $self ) = @_;
1387    # Force requery since the messages have changed
1388
1389    foreach my $id (keys %{$self->{queries__}}) {
1390        $self->{queries__}{$id}{fields} = '';
1391    }
1392}
1393
1394# SETTER
1395
1396sub classifier
1397{
1398    my ( $self, $classifier ) = @_;
1399
1400    $self->{classifier__} = $classifier;
1401}
1402
14031;
1404