1 /*
2 ** Zabbix
3 ** Copyright (C) 2001-2021 Zabbix SIA
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
18 **/
19 
20 #include "common.h"
21 
22 #include "db.h"
23 #include "log.h"
24 #include "daemon.h"
25 #include "zbxself.h"
26 #include "sighandler.h"
27 
28 #include "dbcache.h"
29 #include "dbsyncer.h"
30 #include "export.h"
31 
32 extern int		CONFIG_HISTSYNCER_FREQUENCY;
33 extern unsigned char	process_type, program_type;
34 extern int		server_num, process_num;
35 static sigset_t		orig_mask;
36 
37 /******************************************************************************
38  *                                                                            *
39  * Function: zbx_db_flush_timer_queue                                         *
40  *                                                                            *
41  * Purpose: flush timer queue to the database                                 *
42  *                                                                            *
43  ******************************************************************************/
zbx_db_flush_timer_queue(void)44 static void	zbx_db_flush_timer_queue(void)
45 {
46 	int			i;
47 	zbx_vector_ptr_t	persistent_timers;
48 	zbx_db_insert_t		db_insert;
49 
50 	zbx_vector_ptr_create(&persistent_timers);
51 	zbx_dc_clear_timer_queue(&persistent_timers);
52 
53 	if (0 != persistent_timers.values_num)
54 	{
55 		zbx_db_insert_prepare(&db_insert, "trigger_queue", "trigger_queueid", "objectid", "type", "clock", "ns", NULL);
56 
57 		for (i = 0; i < persistent_timers.values_num; i++)
58 		{
59 			zbx_trigger_timer_t	*timer = (zbx_trigger_timer_t *)persistent_timers.values[i];
60 
61 			zbx_db_insert_add_values(&db_insert, __UINT64_C(0), timer->objectid, timer->type,
62 					timer->eval_ts.sec, timer->eval_ts.ns);
63 		}
64 
65 		zbx_dc_free_timers(&persistent_timers);
66 
67 		zbx_db_insert_autoincrement(&db_insert, "trigger_queueid");
68 		zbx_db_insert_execute(&db_insert);
69 		zbx_db_insert_clean(&db_insert);
70 	}
71 
72 	zbx_vector_ptr_destroy(&persistent_timers);
73 }
74 
db_trigger_queue_cleanup(void)75 static void	db_trigger_queue_cleanup(void)
76 {
77 	DBexecute("delete from trigger_queue");
78 	zbx_db_trigger_queue_unlock();
79 }
80 
81 /******************************************************************************
82  *                                                                            *
83  * Function: main_dbsyncer_loop                                               *
84  *                                                                            *
85  * Purpose: periodically synchronises data in memory cache with database      *
86  *                                                                            *
87  * Author: Alexei Vladishev                                                   *
88  *                                                                            *
89  * Comments: never returns                                                    *
90  *                                                                            *
91  ******************************************************************************/
ZBX_THREAD_ENTRY(dbsyncer_thread,args)92 ZBX_THREAD_ENTRY(dbsyncer_thread, args)
93 {
94 	int		sleeptime = -1, total_values_num = 0, values_num, more, total_triggers_num = 0, triggers_num;
95 	double		sec, total_sec = 0.0;
96 	time_t		last_stat_time;
97 	char		*stats = NULL;
98 	const char	*process_name;
99 	size_t		stats_alloc = 0, stats_offset = 0;
100 
101 	process_type = ((zbx_thread_args_t *)args)->process_type;
102 	server_num = ((zbx_thread_args_t *)args)->server_num;
103 	process_num = ((zbx_thread_args_t *)args)->process_num;
104 
105 	zabbix_log(LOG_LEVEL_INFORMATION, "%s #%d started [%s #%d]", get_program_type_string(program_type), server_num,
106 			(process_name = get_process_type_string(process_type)), process_num);
107 
108 	update_selfmon_counter(ZBX_PROCESS_STATE_BUSY);
109 
110 #define STAT_INTERVAL	5	/* if a process is busy and does not sleep then update status not faster than */
111 				/* once in STAT_INTERVAL seconds */
112 
113 	zbx_setproctitle("%s #%d [connecting to the database]", process_name, process_num);
114 	last_stat_time = time(NULL);
115 
116 	zbx_strcpy_alloc(&stats, &stats_alloc, &stats_offset, "started");
117 
118 	/* database APIs might not handle signals correctly and hang, block signals to avoid hanging */
119 	zbx_block_signals(&orig_mask);
120 	DBconnect(ZBX_DB_CONNECT_NORMAL);
121 
122 	if (1 == process_num)
123 		db_trigger_queue_cleanup();
124 
125 	zbx_unblock_signals(&orig_mask);
126 
127 	if (SUCCEED == zbx_is_export_enabled(ZBX_FLAG_EXPTYPE_HISTORY))
128 		zbx_history_export_init("history-syncer", process_num);
129 
130 	if (SUCCEED == zbx_is_export_enabled(ZBX_FLAG_EXPTYPE_TRENDS))
131 		zbx_trends_export_init("history-syncer", process_num);
132 
133 	if (SUCCEED == zbx_is_export_enabled(ZBX_FLAG_EXPTYPE_EVENTS))
134 		zbx_problems_export_init("history-syncer", process_num);
135 
136 	for (;;)
137 	{
138 		sec = zbx_time();
139 		zbx_update_env(sec);
140 
141 		if (0 != sleeptime)
142 			zbx_setproctitle("%s #%d [%s, syncing history]", process_name, process_num, stats);
143 
144 		/* clear timer trigger queue to avoid processing time triggers at exit */
145 		if (!ZBX_IS_RUNNING())
146 			zbx_log_sync_history_cache_progress();
147 
148 		/* database APIs might not handle signals correctly and hang, block signals to avoid hanging */
149 		zbx_block_signals(&orig_mask);
150 		zbx_sync_history_cache(&values_num, &triggers_num, &more);
151 
152 		if (!ZBX_IS_RUNNING() && SUCCEED != zbx_db_trigger_queue_locked())
153 			zbx_db_flush_timer_queue();
154 
155 		zbx_unblock_signals(&orig_mask);
156 
157 		total_values_num += values_num;
158 		total_triggers_num += triggers_num;
159 		total_sec += zbx_time() - sec;
160 
161 		sleeptime = (ZBX_SYNC_MORE == more ? 0 : CONFIG_HISTSYNCER_FREQUENCY);
162 
163 		if (0 != sleeptime || STAT_INTERVAL <= time(NULL) - last_stat_time)
164 		{
165 			stats_offset = 0;
166 			zbx_snprintf_alloc(&stats, &stats_alloc, &stats_offset, "processed %d values", total_values_num);
167 
168 			if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
169 			{
170 				zbx_snprintf_alloc(&stats, &stats_alloc, &stats_offset, ", %d triggers",
171 						total_triggers_num);
172 			}
173 
174 			zbx_snprintf_alloc(&stats, &stats_alloc, &stats_offset, " in " ZBX_FS_DBL " sec", total_sec);
175 
176 			if (0 == sleeptime)
177 				zbx_setproctitle("%s #%d [%s, syncing history]", process_name, process_num, stats);
178 			else
179 				zbx_setproctitle("%s #%d [%s, idle %d sec]", process_name, process_num, stats, sleeptime);
180 
181 			total_values_num = 0;
182 			total_triggers_num = 0;
183 			total_sec = 0.0;
184 			last_stat_time = time(NULL);
185 		}
186 
187 		if (ZBX_SYNC_MORE == more)
188 			continue;
189 
190 		if (!ZBX_IS_RUNNING())
191 			break;
192 
193 		zbx_sleep_loop(sleeptime);
194 	}
195 
196 	/* database APIs might not handle signals correctly and hang, block signals to avoid hanging */
197 	zbx_block_signals(&orig_mask);
198 	if (SUCCEED != zbx_db_trigger_queue_locked())
199 		zbx_db_flush_timer_queue();
200 
201 	DBclose();
202 	zbx_unblock_signals(&orig_mask);
203 
204 	zbx_log_sync_history_cache_progress();
205 
206 	zbx_free(stats);
207 
208 	exit(EXIT_SUCCESS);
209 #undef STAT_INTERVAL
210 }
211