/* spmfilter - mail filtering framework
* Copyright (C) 2009-2012 Axel Steiner, Werner Detter and SpaceNet AG
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this program. If not, see .
*/
#define _GNU_SOURCE
#include
#include
#include
#include
#include
#include
#include
#include "smf_settings.h"
#include "smf_session.h"
#include "smf_trace.h"
#include "smf_lookup.h"
#include "smf_lookup_sql.h"
#include "smf_core.h"
#include "smf_dict.h"
#include "smf_list.h"
#include "smf_internal.h"
#define THIS_MODULE "lookup_sql"
void smf_lookup_sql_abort_handler(const char *error) {
TRACE(TRACE_ERR, "%s", error);
}
char *smf_lookup_sql_get_rand_host(SMFSettings_T *settings) {
int random;
SMFListElem_T *e = NULL;
int count = 0;
assert(settings);
TRACE(TRACE_DEBUG,"trying to get random sql server");
srand(time(NULL));
random = rand() % smf_list_size(settings->sql_host);
e = smf_list_head(settings->sql_host);
while(e != NULL) {
count++;
if(count != random)
return (char *)smf_list_data(e);
e = e->next;
}
return NULL;
}
void smf_lookup_sql_con_close(Connection_T c) {
TRACE(TRACE_LOOKUP,"returning connection to pool");
Connection_close(c);
return;
}
char *smf_lookup_sql_get_dsn(SMFSettings_T *settings, char *host) {
assert(settings);
char *sdsn = NULL;
SMFListElem_T *e = NULL;
sdsn = (char *)calloc(1,sizeof(char));
if (settings->sql_driver != NULL) {
smf_core_strcat_printf(&sdsn,"%s://",settings->sql_driver);
} else {
TRACE(TRACE_ERR,"error, no sql driver defined!");
return NULL;
}
if (host != NULL) {
smf_core_strcat_printf(&sdsn,"%s",host);
} else {
if ((strcasecmp(settings->backend_connection,"balance") == 0) &&
(strcasecmp(settings->sql_driver,"sqlite") != 0)) {
smf_core_strcat_printf(&sdsn,"%s",smf_lookup_sql_get_rand_host(settings));
} else {
if (strcasecmp(settings->sql_driver,"sqlite") != 0) {
e = smf_list_head(settings->sql_host);
smf_core_strcat_printf(&sdsn,"%s",(char *)smf_list_data(e));
}
}
}
if (settings->sql_port)
smf_core_strcat_printf(&sdsn,":%u",settings->sql_port);
if (settings->sql_name) {
if (strcasecmp(settings->sql_driver,"sqlite") == 0) {
/* expand ~ in db name to HOME env variable */
if ((strlen(settings->sql_name) > 0 ) && (settings->sql_name[0] == '~')) {
char *homedir;
if ((homedir = getenv ("HOME")) == NULL)
TRACE(TRACE_ERR,"can't expand ~ in db name");
asprintf(&settings->sql_name,"%s%s", homedir, &(settings->sql_name[1]));
}
smf_core_strcat_printf(&sdsn, "%s", settings->sql_name);
} else {
smf_core_strcat_printf(&sdsn,"/%s",settings->sql_name);
}
}
if (settings->sql_user && strlen((const char*)settings->sql_user)) {
smf_core_strcat_printf(&sdsn,"?user=%s", settings->sql_user);
if (settings->sql_pass && strlen((const char *)settings->sql_pass))
smf_core_strcat_printf(&sdsn,"&password=%s", settings->sql_pass);
if (strcasecmp(settings->sql_driver,"mysql") == 0) {
if (settings->sql_encoding && strlen((const char *)settings->sql_encoding))
smf_core_strcat_printf(&sdsn,"&charset=%s", settings->sql_encoding);
}
}
TRACE(TRACE_LOOKUP,"sql db at url: [%s]", sdsn);
return sdsn;
}
int smf_lookup_sql_start_pool(SMFSettings_T *settings, char *dsn) {
int sweep_interval = 60;
Connection_T c = NULL;
SMFSQLConnection_T *con = NULL;
assert(settings);
assert(dsn);
if (settings->lookup_connection != NULL)
smf_lookup_sql_disconnect(settings);
con = malloc(sizeof(SMFSQLConnection_T));
con->pool = NULL;
con->url = URL_new(dsn);
if (settings->lookup_connection != NULL) smf_lookup_sql_disconnect(settings);
settings->lookup_connection = (void *)con;
if (!(con->pool = ConnectionPool_new(con->url))) {
TRACE(TRACE_ERR,"error creating database connection pool");
smf_lookup_sql_disconnect(settings);
return -1;
}
if (settings->sql_max_connections > 0) {
if (settings->sql_max_connections < (unsigned int)ConnectionPool_getInitialConnections(con->pool))
ConnectionPool_setInitialConnections(con->pool, settings->sql_max_connections);
ConnectionPool_setMaxConnections(con->pool, settings->sql_max_connections);
TRACE(TRACE_LOOKUP,"database connection pool created with maximum connections of [%d]",settings->sql_max_connections);
}
ConnectionPool_setReaper(con->pool, sweep_interval);
TRACE(TRACE_LOOKUP, "run a database connection reaper thread every [%d] seconds", sweep_interval);
if (strcasecmp(settings->sql_driver,"sqlite") != 0)
ConnectionPool_setAbortHandler(con->pool, smf_lookup_sql_abort_handler);
ConnectionPool_start(con->pool);
if (!(c = ConnectionPool_getConnection(con->pool))) {
smf_lookup_sql_disconnect(settings);
return -1;
}
if (Connection_ping(c) == 0) {
smf_lookup_sql_disconnect(settings);
return -1;
}
smf_lookup_sql_con_close(c);
TRACE(TRACE_LOOKUP, "database connection pool started with [%d] connections, max [%d]",
ConnectionPool_getInitialConnections(con->pool), ConnectionPool_getMaxConnections(con->pool));
return 0;
}
int smf_lookup_sql_connect(SMFSettings_T *settings) {
char *dsn = NULL;
int ret = -1;
SMFListElem_T *elem;
char *host;
assert(settings);
dsn = smf_lookup_sql_get_dsn(settings, NULL);
if ((ret = smf_lookup_sql_start_pool(settings,dsn)) != 0) {
TRACE(TRACE_ERR,"failed to initialize sql pool\n");
/* check failover connections */
elem = smf_list_head(settings->sql_host);
while(elem != NULL) {
if (dsn != NULL) free(dsn);
host = (char *)smf_list_data(elem);
dsn = smf_lookup_sql_get_dsn(settings, host);
if ((ret = smf_lookup_sql_start_pool(settings,dsn)) == 0)
break;
elem = elem->next;
}
}
if (ret == 0) {
TRACE(TRACE_LOOKUP,"successfully initialized sql pool\n");
} else {
TRACE(TRACE_LOOKUP,"failed initialized sql pool\n");
}
free(dsn);
return ret;
}
void smf_lookup_sql_disconnect(SMFSettings_T *settings) {
SMFSQLConnection_T *con = NULL;
assert(settings);
if (settings->lookup_connection != NULL) {
con = (SMFSQLConnection_T *)settings->lookup_connection;
TRACE(TRACE_LOOKUP,"closing database connection");
ConnectionPool_stop(con->pool);
ConnectionPool_free(&con->pool);
URL_free(&con->url);
free(con);
settings->lookup_connection = NULL;
}
}
Connection_T smf_lookup_sql_get_connection(ConnectionPool_T pool) {
int i=0, k=0;
Connection_T c;
while (i++<30) {
c = ConnectionPool_getConnection(pool);
if (c) {
if(Connection_ping(c) == 1) break;
else Connection_close(c);
}
if((int)(i % 5)==0) {
TRACE(TRACE_WARNING, "Thread is having trouble obtaining a database connection. Try [%d]", i);
k = ConnectionPool_reapConnections(pool);
TRACE(TRACE_LOOKUP, "Database reaper closed [%d] stale connections", k);
}
sleep(1);
}
if (! c) {
TRACE(TRACE_ERR,"[%p] can't get a database connection from the pool! max [%d] size [%d] active [%d]",
pool,
ConnectionPool_getMaxConnections(pool),
ConnectionPool_size(pool),
ConnectionPool_active(pool));
}
assert(c);
TRACE(TRACE_LOOKUP,"[%p] got connection from pool", c);
return c;
}
SMFList_T *smf_lookup_sql_query(SMFSettings_T *settings, SMFSession_T *session, const char *q, ...) {
SMFSQLConnection_T *con;
Connection_T c;
ResultSet_T r;
SMFList_T *result;
va_list ap;
char *query;
int i;
va_start(ap, q);
vasprintf(&query,q,ap);
va_end(ap);
smf_core_strstrip(query);
if (strlen(query) == 0) return NULL;
/* active connection? */
if (settings->lookup_connection == NULL)
if(smf_lookup_sql_connect(settings) != 0) return NULL;
con = (SMFSQLConnection_T *)settings->lookup_connection;
if (con->pool == NULL)
if (smf_lookup_sql_connect(settings) != 0) return NULL;
if (smf_list_new(&result,smf_internal_dict_list_destroy)!=0) {
return NULL;
} else {
c = smf_lookup_sql_get_connection(con->pool);
TRY
r = Connection_executeQuery(c, query,NULL);
CATCH(SQLException)
STRACE(TRACE_ERR,session->id,"SQL error: %s\n", Connection_getLastError(c));
return NULL;
END_TRY;
while (ResultSet_next(r)) {
SMFDict_T *d = smf_dict_new();
for (i=1; i <= ResultSet_getColumnCount(r); i++) {
int blob_size = 0;
char *c = (char *)ResultSet_getColumnName(r,i);
char *col_name = NULL;
col_name = strdup(c);
const void *data = ResultSet_getBlob(r, i, &blob_size);
smf_dict_set(d,col_name,data);
free(col_name);
}
if (smf_list_append(result,d) != 0) return NULL;
}
STRACE(TRACE_LOOKUP,session->id,"query [%s] returned [%d] rows", query, result->size);
}
free(query);
smf_lookup_sql_con_close(c);
/* if not persistent, close connection */
if (settings->lookup_persistent != 1)
smf_lookup_sql_disconnect(settings);
return result;
}