1# POPFILE LOADABLE MODULE 2package POPFile::History; 3 4use POPFile::Module; 5@ISA = ("POPFile::Module"); 6 7#---------------------------------------------------------------------------- 8# 9# This module handles POPFile's history. It manages entries in the POPFile 10# database and on disk that store messages previously classified by POPFile. 11# 12# Copyright (c) 2001-2011 John Graham-Cumming 13# 14# This file is part of POPFile 15# 16# POPFile is free software; you can redistribute it and/or modify it 17# under the terms of version 2 of the GNU General Public License as 18# published by the Free Software Foundation. 19# 20# POPFile is distributed in the hope that it will be useful, 21# but WITHOUT ANY WARRANTY; without even the implied warranty of 22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 23# GNU General Public License for more details. 24# 25# You should have received a copy of the GNU General Public License 26# along with POPFile; if not, write to the Free Software 27# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 28# 29#---------------------------------------------------------------------------- 30 31use strict; 32use warnings; 33use locale; 34 35use Date::Parse; 36use Digest::MD5 qw( md5_hex ); 37 38my $fields_slot = # PROFILE BLOCK START 39'history.id, hdr_from, hdr_to, hdr_cc, hdr_subject, hdr_date, hash, inserted, 40 buckets.name, usedtobe, history.bucketid, magnets.val, size'; # PROFILE BLOCK STOP 41 42#---------------------------------------------------------------------------- 43# new 44# 45# Class new() function 46#---------------------------------------------------------------------------- 47sub new 48{ 49 my $proto = shift; 50 my $class = ref($proto) || $proto; 51 my $self = POPFile::Module->new(); 52 53 # List of committed history items waiting to be committed 54 # into the database, it consists of lists containing three 55 # elements: the slot id, the bucket classified to and the 56 # magnet if used 57 58 $self->{commit_list__} = (); 59 60 # Contains queries started with start_query and consists 61 # of a mapping between unique IDs and quadruples containing 62 # a reference to the SELECT and a cache of already fetched 63 # rows and a total row count. These quadruples are implemented 64 # as a sub-hash with keys query, count, cache, fields 65 66 $self->{queries__} = (); 67 68 $self->{firsttime__} = 1; 69 70 # Will contain the database handle retrieved from 71 # Classifier::Bayes 72 73 $self->{db__} = undef; 74 75 $self->{classifier__} = 0; 76 77 bless($self, $class); 78 79 $self->name( 'history' ); 80 81 return $self; 82} 83 84#---------------------------------------------------------------------------- 85# 86# initialize 87# 88# Called to initialize the history module 89# 90#---------------------------------------------------------------------------- 91sub initialize 92{ 93 my ( $self ) = @_; 94 95 # Keep the history for two days 96 97 $self->config_( 'history_days', 2 ); 98 99 # If 1, Messages are saved to an archive when they are removed or expired 100 # from the history cache 101 102 $self->config_( 'archive', 0 ); 103 104 # The directory where messages will be archived to, in sub-directories for 105 # each bucket 106 107 $self->config_( 'archive_dir', 'archive' ); 108 109 # This is an advanced setting which will save archived files to a 110 # randomly numbered sub-directory, if set to greater than zero, otherwise 111 # messages will be saved in the bucket directory 112 # 113 # 0 <= directory name < archive_classes 114 115 $self->config_( 'archive_classes', 0 ); 116 117 # Need TICKD message for history clean up, COMIT when a message 118 # is committed to the history 119 120 $self->mq_register_( 'TICKD', $self ); 121 $self->mq_register_( 'COMIT', $self ); 122 123 return 1; 124} 125 126#---------------------------------------------------------------------------- 127# 128# stop 129# 130# Called to stop the history module 131# 132#---------------------------------------------------------------------------- 133sub stop 134{ 135 my ( $self ) = @_; 136 137 # Commit any remaining history items. This is needed because it's 138 # possible that we get called with a stop after things have been 139 # added to the queue and before service() is called 140 141 $self->commit_history__(); 142 143 if ( defined( $self->{db__} ) ) { 144 $self->{db__}->disconnect; 145 $self->{db__} = undef; 146 } 147} 148 149#---------------------------------------------------------------------------- 150# 151# db__ 152# 153# Since we don't know the order in which the start() methods of PLMs 154# is called we cannot be sure that Classifier::Bayes will have started 155# and connected to the database before us, hence we can't set our 156# database handle at start time. So instead we access the db handle 157# through this method 158# 159#---------------------------------------------------------------------------- 160sub db__ 161{ 162 my ( $self ) = @_; 163 164 if ( !defined( $self->{db__} ) ) { 165 $self->{db__} = $self->{classifier__}->db()->clone; 166 } 167 168 return $self->{db__}; 169} 170 171#---------------------------------------------------------------------------- 172# 173# service 174# 175# Called periodically so that the module can do its work 176# 177#---------------------------------------------------------------------------- 178sub service 179{ 180 my ( $self ) = @_; 181 182 if ( $self->{firsttime__} ) { 183 $self->upgrade_history_files__(); 184 $self->{firsttime__} = 0; 185 } 186 187 # Note when we go to multiuser POPFile we'll need to change this call 188 # so that we are sure that the session IDs that it is using are still 189 # valid. The easiest way will be to call it in deliver() when we get 190 # a COMIT message. 191 192 $self->commit_history__(); 193 194 return 1; 195} 196 197#---------------------------------------------------------------------------- 198# 199# deliver 200# 201# Called by the message queue to deliver a message 202# 203# There is no return value from this method 204# 205#---------------------------------------------------------------------------- 206sub deliver 207{ 208 my ( $self, $type, @message ) = @_; 209 210 # If a day has passed then clean up the history 211 212 if ( $type eq 'TICKD' ) { 213 $self->cleanup_history(); 214 } 215 216 if ( $type eq 'COMIT' ) { 217 push ( @{$self->{commit_list__}}, \@message ); 218 } 219} 220 221# --------------------------------------------------------------------------- 222# 223# forked 224# 225# This is called inside a child process that has just forked, since the 226# child needs access to the database we open it 227# 228# --------------------------------------------------------------------------- 229sub forked 230{ 231 my ( $self ) = @_; 232 233 $self->{db__} = undef; 234} 235 236#---------------------------------------------------------------------------- 237# 238# ADDING TO THE HISTORY 239# 240# To add a message to the history the following sequence of calls 241# is made: 242# 243# 1. Obtain a unique ID and filename for the new message by a call 244# to reserve_slot 245# 246# 2. Write the message into the filename returned 247# 248# 3. Call commit_slot with the bucket into which the message was 249# classified 250# 251# If an error occurs after #1 and the slot is unneeded then call 252# release_slot 253# 254#---------------------------------------------------------------------------- 255# 256# FINDING A HISTORY ENTRY 257# 258# 1. If you know the slot id then call get_slot_file to obtain 259# the full path where the file is stored 260# 261# 2. If you know the message hash then call get_slot_from hash 262# to get the slot id 263# 264# 3. If you know the message headers then use get_message_hash 265# to get the hash 266# 267#---------------------------------------------------------------------------- 268 269#---------------------------------------------------------------------------- 270# 271# reserve_slot 272# 273# Called to reserve a place in the history for a message that is in the 274# process of being received. It returns a unique ID for this slot and 275# the full path to the file where the message should be stored. The 276# caller is expected to later call either release_slot (if the slot is not 277# going to be used) or commit_slot (if the file has been written and the 278# entry should be added to the history). 279# 280# The only parameter is optional and exists for the sake of the test- 281# suite: you can pass in the time at which the message was inserted, 282# ie. the time at which the message arrived. 283#---------------------------------------------------------------------------- 284sub reserve_slot 285{ 286 my $self = shift; 287 my $inserted_time = shift || time; 288 289 my $insert_sth = $self->db__()->prepare( # PROFILE BLOCK START 290 'insert into history ( userid, committed, inserted ) 291 values ( ?, ?, ? );' ); # PROFILE BLOCK STOP 292 my $is_sqlite2 = ( $self->db__()->{Driver}->{Name} =~ /SQLite2?/ ) && # PROFILE BLOCK START 293 ( $self->db__()->{sqlite_version} =~ /^2\./ ); # PROFILE BLOCK STOP 294 295 my $slot; 296 297 while ( !defined($slot) || $slot == 0 ) { 298 my $r = int(rand( 1000000000 )+2); 299 300 $self->log_( 2, "reserve_slot selected random number $r" ); 301 302 # Get the date/time now which will be stored in the database 303 # so that we can sort on the Date: header in the message and 304 # when we received it 305 306 my $result = $insert_sth->execute( 1, $r, $inserted_time ); 307 next if ( !defined( $result ) ); 308 309 if ( $is_sqlite2 ) { 310 $slot = $self->db__()->func( 'last_insert_rowid' ); 311 } else { 312 $slot = $self->db__()->last_insert_id( undef, undef, 'history', 'id' ); 313 } 314 } 315 316 $insert_sth->finish; 317 318 $self->log_( 2, "reserve_slot returning slot id $slot" ); 319 320 return ( $slot, $self->get_slot_file( $slot ) ); 321} 322 323#---------------------------------------------------------------------------- 324# 325# release_slot 326# 327# See description with reserve_slot; release_slot releases a history slot 328# previously allocated with reserve_slot and discards it. 329# 330# id Unique ID returned by reserve_slot 331# 332#---------------------------------------------------------------------------- 333sub release_slot 334{ 335 my ( $self, $slot ) = @_; 336 337 # Remove the entry from the database and delete the file 338 # if present 339 340 my $delete = 'delete from history where history.id = ?;'; 341 342 my $h = $self->db__()->prepare( $delete ); 343 $h->execute( $slot ); 344 345 my $file = $self->get_slot_file( $slot ); 346 347 unlink $file; 348 349 # It's now possible that the directory for the slot file is empty 350 # and we want to delete it so that things get cleaned up 351 # automatically 352 353 my $directory = $file; 354 $directory =~ s/popfile[a-f0-9]{2}\.msg$//i; 355 356 my $depth = 3; 357 358 while ( $depth > 0 ) { 359 if ( rmdir( $directory ) ) { 360 $directory =~ s![a-f0-9]{2}/$!!i; 361 $depth--; 362 } 363 else { 364 # We either aren't allowed to delete the 365 # directory or it wasn't empty 366 last; 367 } 368 } 369} 370 371#---------------------------------------------------------------------------- 372# 373# commit_slot 374# 375# See description with reserve_slot; commit_slot commits a history 376# slot to the database and makes it part of the history. Before this 377# is called the full message should have been written to the file 378# returned by reserve_slot. Note that commit_slot queues the message 379# for insertion and does not commit it until some (short) time later 380# 381# session User session with Classifier::Bayes API 382# slot Unique ID returned by reserve_slot 383# bucket Bucket classified to 384# magnet Magnet if used 385# 386#---------------------------------------------------------------------------- 387sub commit_slot 388{ 389 my ( $self, $session, $slot, $bucket, $magnet ) = @_; 390 391 $self->mq_post_( 'COMIT', $session, $slot, $bucket, $magnet ); 392} 393 394#---------------------------------------------------------------------------- 395# 396# change_slot_classification 397# 398# Used to 'reclassify' a message by changing its classification in the 399# database. 400# 401# slot The slot to update 402# class The new classification 403# session A valid API session 404# undo If set to 1 then indicates an undo operation 405# 406#---------------------------------------------------------------------------- 407sub change_slot_classification 408{ 409 my ( $self, $slot, $class, $session, $undo ) = @_; 410 411 $self->log_( 0, "Change slot classification of $slot to $class" ); 412 413 # Get the bucket ID associated with the new classification 414 # then retrieve the current classification for this slot 415 # and update the database 416 417 my $bucketid = $self->{classifier__}->get_bucket_id( # PROFILE BLOCK START 418 $session, $class ); # PROFILE BLOCK STOP 419 420 my $oldbucketid = 0; 421 if ( !$undo ) { 422 my @fields = $self->get_slot_fields( $slot ); 423 $oldbucketid = $fields[10]; 424 } 425 426 my $h = $self->db__()->prepare( # PROFILE BLOCK START 427 'update history set bucketid = ?, 428 usedtobe = ? 429 where id = ?;' ); # PROFILE BLOCK STOP 430 $h->execute( $bucketid, $oldbucketid, $slot ); 431 $self->force_requery(); 432} 433 434#---------------------------------------------------------------------------- 435# 436# revert_slot_classification 437# 438# Used to undo a 'reclassify' a message by changing its classification 439# in the database. 440# 441# slot The slot to update 442# 443#---------------------------------------------------------------------------- 444sub revert_slot_classification 445{ 446 my ( $self, $slot ) = @_; 447 448 my @fields = $self->get_slot_fields( $slot ); 449 my $oldbucketid = $fields[9]; 450 451 my $h = $self->db__()->prepare( # PROFILE BLOCK START 452 'update history set bucketid = ?, 453 usedtobe = ? 454 where id = ?;' ); # PROFILE BLOCK STOP 455 $h->execute( $oldbucketid, 0, $slot ); 456 $self->force_requery(); 457} 458 459#--------------------------------------------------------------------------- 460# 461# get_slot_fields 462# 463# Returns the fields associated with a specific slot. We return the 464# same collection of fields as get_query_rows. 465# 466# slot The slot id 467# 468#--------------------------------------------------------------------------- 469sub get_slot_fields 470{ 471 my ( $self, $slot ) = @_; 472 473 return undef if ( !defined( $slot ) || $slot !~ /^\d+$/ ); 474 475 my $h = $self->db__()->prepare( # PROFILE BLOCK START 476 "select $fields_slot from history, buckets, magnets 477 where history.id = ? and 478 buckets.id = history.bucketid and 479 magnets.id = magnetid and 480 history.committed = 1;" ); # PROFILE BLOCK STOP 481 $h->execute( $slot ); 482 my @result = $h->fetchrow_array; 483 return @result; 484} 485 486#--------------------------------------------------------------------------- 487# 488# is_valid_slot 489# 490# Returns 1 if the slot ID passed in is valid 491# 492# slot The slot id 493# 494#--------------------------------------------------------------------------- 495sub is_valid_slot 496{ 497 my ( $self, $slot ) = @_; 498 499 return undef if ( !defined( $slot ) || $slot !~ /^\d+$/ ); 500 501 my $h = $self->db__()->prepare( # PROFILE BLOCK START 502 'select id from history 503 where history.id = ? and 504 history.committed = 1;' ); # PROFILE BLOCK STOP 505 $h->execute( $slot ); 506 my @row = $h->fetchrow_array; 507 508 return ( ( @row ) && ( $row[0] == $slot ) ); 509} 510 511#--------------------------------------------------------------------------- 512# 513# commit_history__ 514# 515# (private) Used internally to commit messages that have been committed 516# with a call to commit_slot to the database 517# 518#---------------------------------------------------------------------------- 519sub commit_history__ 520{ 521 my ( $self ) = @_; 522 523 if ( $#{$self->{commit_list__}} == -1 ) { 524 return; 525 } 526 527 my $update_history = $self->db__()->prepare( # PROFILE BLOCK START 528 'update history set hdr_from = ?, 529 hdr_to = ?, 530 hdr_date = ?, 531 hdr_cc = ?, 532 hdr_subject = ?, 533 sort_from = ?, 534 sort_to = ?, 535 sort_cc = ?, 536 committed = ?, 537 bucketid = ?, 538 usedtobe = ?, 539 magnetid = ?, 540 hash = ?, 541 size = ? 542 where id = ?;' ); # PROFILE BLOCK STOP 543 544 $self->db__()->begin_work; 545 foreach my $entry (@{$self->{commit_list__}}) { 546 my ( $session, $slot, $bucket, $magnet ) = @{$entry}; 547 548 my $file = $self->get_slot_file( $slot ); 549 550 # Committing to the history requires the following steps 551 # 552 # 1. Parse the message to extract the headers 553 # 2. Compute MD5 hash of Message-ID, Date and Subject 554 # 3. Update the related row with the headers and 555 # committed set to 1 556 557 my %header; 558 559 if ( open FILE, "<$file" ) { 560 my $last; 561 while ( <FILE> ) { 562 s/[\r\n]//g; 563 564 if ( /^$/ ) { 565 last; 566 } 567 568 if ( /^([^ \t]+):[ \t]*(.*)$/ ) { 569 $last = lc $1; 570 push @{$header{$last}}, $2; 571 572 } else { 573 if ( defined $last ) { 574 ${$header{$last}}[$#{$header{$last}}] .= $_; 575 } 576 } 577 } 578 close FILE; 579 } 580 else { 581 $self->log_( 0, "Could not open history message file $file for reading." ); 582 } 583 584 my $hash = $self->get_message_hash( ${$header{'message-id'}}[0], 585 ${$header{'date'}}[0], 586 ${$header{'subject'}}[0], 587 ${$header{'received'}}[0] ); 588 589 # For sorting purposes the From, To and CC headers have special 590 # cleaned up versions of themselves in the database. The idea 591 # is that case and certain characters should be ignored when 592 # sorting these fields 593 # 594 # "John Graham-Cumming" <spam@jgc.org> maps to 595 # john graham-cumming spam@jgc.org 596 597 my @sortable = ( 'from', 'to', 'cc' ); 598 my %sort_headers; 599 600 foreach my $h (@sortable) { 601 $sort_headers{$h} = # PROFILE BLOCK START 602 $self->{classifier__}->{parser__}->decode_string( 603 ${$header{$h}}[0] ); # PROFILE BLOCK STOP 604 $sort_headers{$h} = lc($sort_headers{$h} || ''); 605 $sort_headers{$h} =~ s/[\"<>]//g; 606 $sort_headers{$h} =~ s/^[ \t]+//g; 607 $sort_headers{$h} =~ s/\0//g; 608 } 609 610 # Make sure that the headers we are going to insert into 611 # the database have been defined and are suitably quoted 612 613 my @required = ( 'from', 'to', 'cc', 'subject' ); 614 615 foreach my $h (@required) { 616 ${$header{$h}}[0] = # PROFILE BLOCK START 617 $self->{classifier__}->{parser__}->decode_string( 618 ${$header{$h}}[0] ); # PROFILE BLOCK STOP 619 620 if ( !defined ${$header{$h}}[0] || ${$header{$h}}[0] =~ /^\s*$/ ) { 621 if ( $h ne 'cc' ) { 622 ${$header{$h}}[0] = "<$h header missing>"; 623 } else { 624 ${$header{$h}}[0] = ''; 625 } 626 } 627 628 ${$header{$h}}[0] =~ s/\0//g; 629 } 630 631 # If we do not have a date header then set the date to 632 # 0 (start of the Unix epoch), otherwise parse the string 633 # using Date::Parse to interpret it and turn it into the 634 # Unix epoch. 635 636 if ( !defined( ${$header{date}}[0] ) ) { 637 ${$header{date}}[0] = 0; 638 } else { 639 ${$header{date}}[0] = str2time( ${$header{date}}[0] ) || 0; 640 } 641 642 # Figure out the ID of the bucket this message has been 643 # classified into (and the same for the magnet if it is 644 # defined) 645 646 my $bucketid = $self->{classifier__}->get_bucket_id( # PROFILE BLOCK START 647 $session, $bucket ); # PROFILE BLOCK STOP 648 649 my $msg_size = -s $file; 650 651 # If we can't get the bucket ID because the bucket doesn't exist 652 # which could happen when we are upgrading the history which 653 # has old bucket names in it then we will remove the entry from the 654 # history and log the failure 655 656 if ( defined( $bucketid ) ) { 657 my $result = $update_history->execute( # PROFILE BLOCK START 658 ${$header{from}}[0], # hdr_from 659 ${$header{to}}[0], # hdr_to 660 ${$header{date}}[0], # hdr_date 661 ${$header{cc}}[0], # hdr_cc 662 ${$header{subject}}[0], # hdr_subject 663 $sort_headers{from}, # sort_from 664 $sort_headers{to}, # sort_to 665 $sort_headers{cc}, # sort_cc 666 1, # committed 667 $bucketid, # bucketid 668 0, # usedtobe 669 $magnet, # magnetid 670 $hash, # hash 671 $msg_size, # size 672 $slot # id 673 ); # PROFILE BLOCK STOP 674 } else { 675 $self->log_( 0, "Couldn't find bucket ID for bucket $bucket when committing $slot" ); 676 $self->release_slot( $slot ); 677 } 678 } 679 $self->db__()->commit; 680 $update_history->finish; 681 682 $self->{commit_list__} = (); 683 $self->force_requery(); 684} 685 686# --------------------------------------------------------------------------- 687# 688# delete_slot 689# 690# Deletes an entry from the database and disk, optionally archiving it 691# if the archive parameters have been set 692# 693# $slot The slot ID 694# $archive 1 if it's OK to archive this entry 695# 696# --------------------------------------------------------------------------- 697sub delete_slot 698{ 699 my ( $self, $slot, $archive ) = @_; 700 701 my $file = $self->get_slot_file( $slot ); 702 $self->log_( 2, "delete_slot called for slot $slot, file $file" ); 703 704 if ( $archive && $self->config_( 'archive' ) ) { 705 my $path = $self->get_user_path_( $self->config_( 'archive_dir' ), 0 ); 706 707 $self->make_directory__( $path ); 708 709 my $b = $self->db__()->selectrow_arrayref( 710 "select buckets.name from history, buckets 711 where history.bucketid = buckets.id and 712 history.id = $slot;" ); 713 714 my $bucket = $b->[0]; 715 716 if ( ( $bucket ne 'unclassified' ) && # PROFILE BLOCK START 717 ( $bucket ne 'unknown class' ) ) { # PROFILE BLOCK STOP 718 $path .= "\/" . $bucket; 719 $self->make_directory__( $path ); 720 721 if ( $self->config_( 'archive_classes' ) > 0) { 722 723 # Archive to a random sub-directory of the bucket archive 724 725 my $subdirectory = int( rand( # PROFILE BLOCK START 726 $self->config_( 'archive_classes' ) ) ); # PROFILE BLOCK STOP 727 $path .= "\/" . $subdirectory; 728 $self->make_directory__( $path ); 729 } 730 731 # Previous comment about this potentially being unsafe 732 # (may have placed messages in unusual places, or 733 # overwritten files) no longer applies. Files are now 734 # placed in the user directory, in the archive_dir 735 # subdirectory 736 737 $self->copy_file__( $file, $path, "popfile$slot.msg" ); 738 } 739 } 740 741 # Now remove the entry from the database, and the file from disk, 742 # and also invalidate the caches of any open queries since they 743 # may have been affected 744 745 $self->release_slot( $slot ); 746 $self->force_requery(); 747} 748 749#---------------------------------------------------------------------------- 750# 751# start_deleting 752# 753# Called before doing a block of calls to delete_slot. This will call 754# back into the Classifier::Bayes to tweak the database performance to 755# make this quick. 756# 757#---------------------------------------------------------------------------- 758sub start_deleting 759{ 760 my ( $self ) = @_; 761 762# $self->{classifier__}->tweak_sqlite( 1, 1, $self->db__() ); 763 $self->db__()->begin_work; 764} 765 766#---------------------------------------------------------------------------- 767# 768# stop_deleting 769# 770# Called after doing a block of calls to delete_slot. This will call 771# back into the Classifier::Bayes to untweak the database performance. 772# 773#---------------------------------------------------------------------------- 774sub stop_deleting 775{ 776 my ( $self ) = @_; 777 778 $self->db__()->commit; 779# $self->{classifier__}->tweak_sqlite( 1, 0, $self->db__() ); 780} 781 782#---------------------------------------------------------------------------- 783# 784# get_slot_file 785# 786# Used to map a slot ID to the full path of the file will contain 787# the message associated with the slot 788# 789#---------------------------------------------------------------------------- 790sub get_slot_file 791{ 792 my ( $self, $slot ) = @_; 793 794 # The mapping between the slot and the file goes as follows: 795 # 796 # 1. Convert the file to an 8 digit hex number (with leading 797 # zeroes). 798 # 2. Call that number aabbccdd 799 # 3. Build the path aa/bb/cc 800 # 4. Name the file popfiledd.msg 801 # 5. Add the msgdir location to obtain 802 # msgdir/aa/bb/cc/popfiledd.msg 803 # 804 # Hence each directory can have up to 256 entries 805 806 my $hex_slot = sprintf( '%8.8x', $slot ); 807 my $path = $self->get_user_path_( # PROFILE BLOCK START 808 $self->global_config_( 'msgdir' ) . 809 substr( $hex_slot, 0, 2 ) . '/', 0 ); # PROFILE BLOCK STOP 810 811 $self->make_directory__( $path ); 812 $path .= substr( $hex_slot, 2, 2 ) . '/'; 813 $self->make_directory__( $path ); 814 $path .= substr( $hex_slot, 4, 2 ) . '/'; 815 $self->make_directory__( $path ); 816 817 my $file = 'popfile' . # PROFILE BLOCK START 818 substr( $hex_slot, 6, 2 ) . '.msg'; # PROFILE BLOCK STOP 819 820 return $path . $file; 821} 822 823#---------------------------------------------------------------------------- 824# 825# get_message_hash 826# 827# Used to compute an MD5 hash of the headers of a message 828# so that the same message can later me identified by a 829# call to get_slot_from_hash 830# 831# messageid The message id header 832# date The date header 833# subject The subject header 834# received First Received header line 835# 836# Note that the values passed in are everything after the : in 837# header without the trailing \r or \n. If a header is missing 838# then pass in the empty string 839# 840#---------------------------------------------------------------------------- 841sub get_message_hash 842{ 843 my ( $self, $messageid, $date, $subject, $received ) = @_; 844 845 $messageid = '' if ( !defined( $messageid ) ); 846 $date = '' if ( !defined( $date ) ); 847 $subject = '' if ( !defined( $subject ) ); 848 $received = '' if ( !defined( $received ) ); 849 850 return md5_hex( "[$messageid][$date][$subject][$received]" ); 851} 852 853#---------------------------------------------------------------------------- 854# 855# get_slot_from_hash 856# 857# Given a hash value (returned by get_message_hash), find any 858# corresponding message in the database and return its slot 859# id. If the message does not exist then return the empty 860# string. 861# 862# hash The hash value 863# 864#---------------------------------------------------------------------------- 865sub get_slot_from_hash 866{ 867 my ( $self, $hash ) = @_; 868 869 my $h = $self->db__()->prepare( # PROFILE BLOCK START 870 'select id from history where hash = ? limit 1;' ); # PROFILE BLOCK STOP 871 $h->execute( $hash ); 872 my $result = $h->fetchrow_arrayref; 873 874 return defined( $result )?$result->[0]:''; 875} 876 877#---------------------------------------------------------------------------- 878# 879# QUERYING THE HISTORY 880# 881# 1. Start a query session by calling start_query and obtain a unique 882# ID 883# 884# 2. Set the query parameter (i.e. sort, search and filter) with a call 885# to set_query 886# 887# 3. Obtain the number of history rows returned by calling get_query_size 888# 889# 4. Get segments of the history returned by calling get_query_rows with 890# the start and end rows needed 891# 892# 5. When finished with the query call stop_query 893# 894#---------------------------------------------------------------------------- 895 896#---------------------------------------------------------------------------- 897# 898# start_query 899# 900# Used to start a query session, returns a unique ID for this 901# query. When the caller is done with the query they return 902# stop_query. 903# 904#---------------------------------------------------------------------------- 905sub start_query 906{ 907 my ( $self ) = @_; 908 909 # Think of a large random number, make sure that it hasn't 910 # been used and then return it 911 912 while (1) { 913 my $id = sprintf( '%8.8x', int(rand(4294967295)) ); 914 915 if ( !defined( $self->{queries__}{$id} ) ) { 916 $self->{queries__}{$id}{query} = 0; 917 $self->{queries__}{$id}{count} = 0; 918 $self->{queries__}{$id}{cache} = (); 919 return $id 920 } 921 } 922} 923 924#---------------------------------------------------------------------------- 925# 926# stop_query 927# 928# Used to clean up after a query session 929# 930# id The ID returned by start_query 931# 932#---------------------------------------------------------------------------- 933sub stop_query 934{ 935 my ( $self, $id ) = @_; 936 937 # If the cache size hasn't grown to the row 938 # count then we didn't fetch everything and so 939 # we fill call finish to clean up 940 941 my $q = $self->{queries__}{$id}{query}; 942 943 if ( ( defined $q ) && ( $q != 0 ) ) { 944 if ( $#{$self->{queries__}{$id}{cache}} != # PROFILE BLOCK START 945 $self->{queries__}{$id}{count} ) { # PROFILE BLOCK STOP 946 $q->finish; 947 undef $self->{queries__}{$id}{query}; 948 } 949 } 950 951 delete $self->{queries__}{$id}; 952} 953 954#---------------------------------------------------------------------------- 955# 956# set_query 957# 958# Called to set up a query with sort, filter and search options 959# 960# id The ID returned by start_query 961# filter Name of bucket to filter on 962# search From/Subject line to search for 963# sort The field to sort on (from, subject, to, cc, bucket, date) 964# (optional leading - for descending sort) 965# not If set to 1 negates the search 966# 967#---------------------------------------------------------------------------- 968sub set_query 969{ 970 my ( $self, $id, $filter, $search, $sort, $not ) = @_; 971 972 $search =~ s/\0//g; 973 $sort = '' if ( $sort !~ /^(\-)?(inserted|from|to|cc|subject|bucket|date|size)$/ ); 974 975 # If this query has already been done and is in the cache 976 # then do no work here 977 978 if ( defined( $self->{queries__}{$id}{fields} ) && # PROFILE BLOCK START 979 ( $self->{queries__}{$id}{fields} eq 980 "$filter:$search:$sort:$not" ) ) { # PROFILE BLOCK STOP 981 return; 982 } 983 984 $self->{queries__}{$id}{fields} = "$filter:$search:$sort:$not"; 985 986 # We do two queries, the first to get the total number of rows that 987 # would be returned and then we start the real query. This is done 988 # so that we know the size of the resulting data without having 989 # to retrieve it all 990 991 $self->{queries__}{$id}{base} = # PROFILE BLOCK START 992 'select XXX from history, buckets, magnets 993 where history.userid = 1 and committed = 1'; # PROFILE BLOCK STOP 994 995 $self->{queries__}{$id}{base} .= ' and history.bucketid = buckets.id'; 996 $self->{queries__}{$id}{base} .= ' and magnets.id = magnetid'; 997 998 # If there's a search portion then add the appropriate clause 999 # to find the from/subject header 1000 1001 my $not_word = $not ? 'not' : ''; 1002 my $not_equal = $not ? '!=' : '='; 1003 my $equal = $not ? '=' : '!='; 1004 1005 if ( $search ne '' ) { 1006 $search = $self->db__()->quote( '%' . $search . '%' ); 1007 $self->{queries__}{$id}{base} .= " and $not_word ( hdr_from like $search or hdr_subject like $search )"; 1008 } 1009 1010 # If there's a filter option then we'll need to get the bucket 1011 # id for the filtered bucket and add the appropriate clause 1012 1013 if ( $filter ne '' ) { 1014 if ( $filter eq '__filter__magnet' ) { 1015 $self->{queries__}{$id}{base} .= # PROFILE BLOCK START 1016 " and history.magnetid $equal 0"; # PROFILE BLOCK STOP 1017 } else { 1018 if ( $filter eq '__filter__reclassified' ) { 1019 $self->{queries__}{$id}{base} .= # PROFILE BLOCK START 1020 " and history.usedtobe $equal 0"; # PROFILE BLOCK STOP 1021 } else { 1022 my $bucket = $self->db__()->quote( $filter ); 1023 $self->{queries__}{$id}{base} .= 1024 " and buckets.name $not_equal $bucket"; 1025 } 1026 } 1027 } 1028 1029 # Add the sort option (if there is one) 1030 1031 if ( $sort ne '' ) { 1032 $sort =~ s/^(\-)//; 1033 my $direction = defined($1)?'desc':'asc'; 1034 if ( $sort eq 'bucket' ) { 1035 $sort = 'buckets.name'; 1036 } else { 1037 if ( $sort =~ /from|to|cc/ ) { 1038 $sort = "sort_$sort"; 1039 } else { 1040 if ( $sort ne 'inserted' && $sort ne 'size' ) { 1041 $sort = "hdr_$sort"; 1042 } 1043 } 1044 } 1045 $self->{queries__}{$id}{base} .= " order by $sort $direction;"; 1046 } else { 1047 $self->{queries__}{$id}{base} .= ' order by inserted desc;'; 1048 } 1049 1050 my $count = $self->{queries__}{$id}{base}; 1051 $self->log_( 2, "Base query is $count" ); 1052 $count =~ s/XXX/COUNT(*)/; 1053 1054 my $h = $self->db__()->prepare( $count ); 1055 $h->execute; 1056 $self->{queries__}{$id}{count} = $h->fetchrow_arrayref->[0]; 1057 $h->finish; 1058 1059 my $select = $self->{queries__}{$id}{base}; 1060 $select =~ s/XXX/$fields_slot/; 1061 $self->{queries__}{$id}{query} = $self->db__()->prepare( $select ); 1062 $self->{queries__}{$id}{cache} = (); 1063} 1064 1065#---------------------------------------------------------------------------- 1066# 1067# delete_query 1068# 1069# Called to delete all the rows returned in a query 1070# 1071# id The ID returned by start_query 1072# 1073#---------------------------------------------------------------------------- 1074sub delete_query 1075{ 1076 my ( $self, $id ) = @_; 1077 1078 $self->start_deleting(); 1079 1080 my $delete = $self->{queries__}{$id}{base}; 1081 $delete =~ s/XXX/history.id/; 1082 my $d = $self->db__()->prepare( $delete ); 1083 $d->execute; 1084 my $history_id; 1085 my @row; 1086 my @ids; 1087 $d->bind_columns( \$history_id ); 1088 while ( $d->fetchrow_arrayref ) { 1089 push ( @ids, $history_id ); 1090 } 1091 foreach my $id (@ids) { 1092 $self->delete_slot( $id, 1 ); 1093 } 1094 1095 $self->stop_deleting(); 1096} 1097 1098#---------------------------------------------------------------------------- 1099# 1100# get_query_size 1101# 1102# Called to return the number of elements in the query. 1103# Should only be called after a call to set_query. 1104# 1105# id The ID returned by start_query 1106# 1107#---------------------------------------------------------------------------- 1108sub get_query_size 1109{ 1110 my ( $self, $id ) = @_; 1111 1112 return $self->{queries__}{$id}{count}; 1113} 1114 1115#---------------------------------------------------------------------------- 1116# 1117# get_query_rows 1118# 1119# Returns the rows in the range [$start, $end) from a query that has 1120# already been set up with a call to set_query. The first row is row 1. 1121# 1122# id The ID returned by start_query 1123# start The first row to return 1124# count Number of rows to return 1125# 1126# Each row contains the fields: 1127# 1128# id (0), from (1), to (2), cc (3), subject (4), date (5), hash (6), 1129# inserted date (7), bucket name (8), reclassified id (9), bucket id (10), 1130# magnet value (11), size (12) 1131#---------------------------------------------------------------------------- 1132sub get_query_rows 1133{ 1134 my ( $self, $id, $start, $count ) = @_; 1135 1136 # First see if we have already retrieved these rows from the query 1137 # if we have then we can just return them from the cache. Otherwise 1138 # fetch the rows from the database and then return them 1139 1140 my $size = $#{$self->{queries__}{$id}{cache}}+1; 1141 1142 $self->log_( 2, "Request for rows $start ($count), current size $size" ); 1143 1144 if ( ( $size < ( $start + $count - 1 ) ) ) { 1145 my $rows = $start + $count - $size; 1146 $self->log_( 2, "Getting $rows rows from database" ); 1147 $self->{queries__}{$id}{query}->execute; 1148 $self->{queries__}{$id}{cache} = # PROFILE BLOCK START 1149 $self->{queries__}{$id}{query}->fetchall_arrayref( 1150 undef, $start + $count - 1 ); # PROFILE BLOCK STOP 1151 $self->{queries__}{$id}{query}->finish; 1152 } 1153 1154 my ( $from, $to ) = ( $start-1, $start+$count-2 ); 1155 1156 $self->log_( 2, "Returning $from..$to" ); 1157 1158 return @{$self->{queries__}{$id}{cache}}[$from..$to]; 1159} 1160 1161# --------------------------------------------------------------------------- 1162# 1163# make_directory__ 1164# 1165# Wrapper for mkdir that ensures that the path we are making doesn't end in 1166# / or \ (Done because your can't do mkdir 'foo/' on NextStep. 1167# 1168# $path The directory to make 1169# 1170# Returns whatever mkdir returns 1171# 1172# --------------------------------------------------------------------------- 1173sub make_directory__ 1174{ 1175 my ( $self, $path ) = @_; 1176 1177 $path =~ s/[\\\/]$//; 1178 1179 return 1 if ( -d $path ); 1180 return mkdir( $path ); 1181} 1182 1183# --------------------------------------------------------------------------- 1184# 1185# compare_mf__ 1186# 1187# Compares two mailfiles, used for sorting mail into order 1188# 1189# --------------------------------------------------------------------------- 1190sub compare_mf__ 1191{ 1192 $a =~ /popfile(\d+)=(\d+)\.msg/; 1193 my ( $ad, $am ) = ( $1, $2 ); 1194 1195 $b =~ /popfile(\d+)=(\d+)\.msg/; 1196 my ( $bd, $bm ) = ( $1, $2 ); 1197 1198 if ( $ad == $bd ) { 1199 return ( $bm <=> $am ); 1200 } else { 1201 return ( $bd <=> $ad ); 1202 } 1203} 1204 1205# --------------------------------------------------------------------------- 1206# 1207# upgrade_history_files__ 1208# 1209# Looks for old .MSG/.CLS history entries and sticks them in the database 1210# 1211# --------------------------------------------------------------------------- 1212sub upgrade_history_files__ 1213{ 1214 my ( $self ) = @_; 1215 1216 # See if there are any .MSG files in the msgdir, and if there are 1217 # upgrade them by placing them in the database 1218 1219 my @msgs = sort compare_mf__ glob $self->get_user_path_( # PROFILE BLOCK START 1220 $self->global_config_( 'msgdir' ) . 'popfile*.msg', 0 ); # PROFILE BLOCK STOP 1221 1222 if ( $#msgs != -1 ) { 1223 my $session = $self->{classifier__}->get_session_key( 'admin', '' ); 1224 1225 print "\nFound old history files, moving them into database\n "; 1226 1227 my $i = 0; 1228 $self->db__()->begin_work; 1229 foreach my $msg (@msgs) { 1230 if ( ( ++$i % 100 ) == 0 ) { 1231 print "[$i]"; 1232 flush STDOUT; 1233 } 1234 1235 # NOTE. We drop the information in $usedtobe, so that 1236 # reclassified messages will no longer appear reclassified 1237 # in upgraded history. Also the $magnet is ignored so 1238 # upgraded history will have no magnet information. 1239 1240 my ( $reclassified, $bucket, $usedtobe, $magnet ) = # PROFILE BLOCK START 1241 $self->history_read_class__( $msg ); # PROFILE BLOCK STOP 1242 1243 if ( $bucket ne 'unknown_class' ) { 1244 my ( $slot, $file ) = $self->reserve_slot(); 1245 rename $msg, $file; 1246 my @message = ( $session, $slot, $bucket, 0 ); 1247 push ( @{$self->{commit_list__}}, \@message ); 1248 } 1249 } 1250 $self->db__()->commit; 1251 1252 print "\nDone upgrading history\n"; 1253 1254 $self->commit_history__(); 1255 $self->{classifier__}->release_session_key( $session ); 1256 1257 unlink $self->get_user_path_( # PROFILE BLOCK START 1258 $self->global_config_( 'msgdir' ) . 'history_cache', 0 ); # PROFILE BLOCK STOP 1259 } 1260} 1261 1262# --------------------------------------------------------------------------- 1263# 1264# history_read_class__ - load and delete the class file for a message. 1265# 1266# returns: ( reclassified, bucket, usedtobe, magnet ) 1267# values: 1268# reclassified: boolean, true if message has been reclassified 1269# bucket: string, the bucket the message is in presently, 1270# unknown class if an error occurs 1271# usedtobe: string, the bucket the message used to be in 1272# (null if not reclassified) 1273# magnet: string, the magnet 1274# 1275# $filename The name of the message to load the class for 1276# 1277# --------------------------------------------------------------------------- 1278sub history_read_class__ 1279{ 1280 my ( $self, $filename ) = @_; 1281 1282 $filename =~ s/msg$/cls/; 1283 1284 my $reclassified = 0; 1285 my $bucket = 'unknown class'; 1286 my $usedtobe; 1287 my $magnet = ''; 1288 1289 if ( open CLASS, "<$filename" ) { 1290 $bucket = <CLASS>; 1291 if ( defined( $bucket ) && # PROFILE BLOCK START 1292 ( $bucket =~ /([^ ]+) MAGNET ([^\r\n]+)/ ) ) { # PROFILE BLOCK STOP 1293 $bucket = $1; 1294 $magnet = $2; 1295 } 1296 1297 $reclassified = 0; 1298 if ( defined( $bucket ) && ( $bucket =~ /RECLASSIFIED/ ) ) { 1299 $bucket = <CLASS>; 1300 $usedtobe = <CLASS>; 1301 $reclassified = 1; 1302 $usedtobe =~ s/[\r\n]//g; 1303 } 1304 close CLASS; 1305 $bucket =~ s/[\r\n]//g if defined( $bucket ); 1306 unlink $filename; 1307 } else { 1308 return ( undef, $bucket, undef, undef ); 1309 } 1310 1311 $bucket = 'unknown class' if ( !defined( $bucket ) ); 1312 1313 return ( $reclassified, $bucket, $usedtobe, $magnet ); 1314} 1315 1316#---------------------------------------------------------------------------- 1317# 1318# cleanup_history 1319# 1320# Removes the popfile*.msg files that are older than a number of days 1321# configured as history_days. 1322# 1323#---------------------------------------------------------------------------- 1324sub cleanup_history 1325{ 1326 my ( $self ) = @_; 1327 1328 my $seconds_per_day = 24 * 60 * 60; 1329 my $old = time - $self->config_( 'history_days' ) * $seconds_per_day; 1330 my @ids; 1331 my $d = $self->db__()->prepare( # PROFILE BLOCK START 1332 'select id from history 1333 where inserted < ?;' ); # PROFILE BLOCK STOP 1334 $d->execute( $old ); 1335 my $id; 1336 $d->bind_columns( \$id ); 1337 while ( $d->fetchrow_arrayref ) { 1338 push ( @ids, $id ); 1339 } 1340 $d->finish; 1341 foreach my $id (@ids) { 1342 $self->delete_slot( $id, 1 ); 1343 } 1344} 1345 1346# --------------------------------------------------------------------------- 1347# 1348# copy_file__ 1349# 1350# Utility to copy a file and ensure that the path it is going to 1351# exists 1352# 1353# $from Where to copy from 1354# $to_dir The directory it will be copied to 1355# $to_name The name of the destination (without the directory) 1356# 1357# --------------------------------------------------------------------------- 1358sub copy_file__ 1359{ 1360 my ( $self, $from, $to_dir, $to_name ) = @_; 1361 1362 if ( open( FROM, "<$from") ) { 1363 if ( open( TO, ">$to_dir\/$to_name") ) { 1364 binmode FROM; 1365 binmode TO; 1366 while (<FROM>) { 1367 print TO $_; 1368 } 1369 close TO; 1370 } 1371 1372 close FROM; 1373 } 1374} 1375 1376# --------------------------------------------------------------------------- 1377# 1378# force_requery 1379# 1380# Called when the database has changed to invalidate any queries that are 1381# open so that cached data is not returned and the database is requeried 1382# 1383# --------------------------------------------------------------------------- 1384sub force_requery 1385{ 1386 my ( $self ) = @_; 1387 # Force requery since the messages have changed 1388 1389 foreach my $id (keys %{$self->{queries__}}) { 1390 $self->{queries__}{$id}{fields} = ''; 1391 } 1392} 1393 1394# SETTER 1395 1396sub classifier 1397{ 1398 my ( $self, $classifier ) = @_; 1399 1400 $self->{classifier__} = $classifier; 1401} 1402 14031; 1404