1 /* 2 * Copyright (C) 2008-2014 Codership Oy <info@codership.com> 3 * 4 * $Id$ 5 */ 6 /* 7 * This header defines generic communication layer 8 * which implements basic open/close/send/receive 9 * functions. Its purpose is to implement all 10 * functionality common to all group communication 11 * uses. Currently this amounts to action 12 * fragmentation/defragmentation and invoking backend 13 * functions. 14 * In the course of development it has become clear 15 * that such fuctionality must be collected in a 16 * separate layer. 17 * Application abstraction layer is based on this one 18 * and uses those functions for its own purposes. 19 */ 20 21 #ifndef _gcs_core_h_ 22 #define _gcs_core_h_ 23 24 #include "gcs.hpp" 25 #include "gcs_act.hpp" 26 #include "gcs_act_proto.hpp" 27 28 #include <galerautils.h> 29 30 #include <stdint.h> 31 #include <stdlib.h> 32 33 /* 'static' method to register configuration variables */ 34 extern bool 35 gcs_core_register (gu_config_t* conf); 36 37 struct gcs_core; 38 typedef struct gcs_core gcs_core_t; 39 40 /* 41 * Allocates context resources private to 42 * generic communicaton layer - send/recieve buffers and the like. 43 */ 44 extern gcs_core_t* 45 gcs_core_create (gu_config_t* conf, 46 gcache_t* cache, 47 const char* node_name, 48 const char* inc_addr, 49 int repl_proto_ver, 50 int appl_proto_ver); 51 52 /* initializes action history (global seqno, group UUID). See gcs.h */ 53 extern long 54 gcs_core_init (gcs_core_t* core, gcs_seqno_t seqno, const gu_uuid_t* uuid); 55 56 /* 57 * gcs_core_open() opens connection 58 * Return values: 59 * zero - success 60 * negative - error code 61 */ 62 extern long 63 gcs_core_open (gcs_core_t* conn, 64 const char* channel, 65 const char* url, 66 bool bootstrap); 67 68 69 /* 70 * gcs_core_close() puts connection in a closed state, 71 * cancelling all ongoing calls. 72 * Return values: 73 * zero - success 74 * negative - error code 75 */ 76 extern long 77 gcs_core_close (gcs_core_t* conn); 78 79 /* 80 * gcs_core_destroy() frees resources allocated by gcs_core_create() 81 * Return values: 82 * zero - success 83 * negative - error code 84 */ 85 extern long 86 gcs_core_destroy (gcs_core_t* conn); 87 88 /* 89 * gcs_core_send() atomically sends action to group. 90 * 91 * NOT THREAD SAFE! Access should be serialized. 92 * 93 * Return values: 94 * non-negative - amount of action bytes sent (sans headers) 95 * negative - error code 96 * -EAGAIN - operation should be retried 97 * -ENOTCONN - connection to primary component lost 98 * 99 * NOTE: Successful return code here does not guarantee delivery to group. 100 * The real status of action is determined only in gcs_core_recv() call. 101 */ 102 extern ssize_t 103 gcs_core_send (gcs_core_t* core, 104 const struct gu_buf* act, 105 size_t act_size, 106 gcs_act_type_t act_type); 107 108 /* 109 * gcs_core_recv() blocks until some action is received from group. 110 * 111 * @param repl_buf ptr to replicated action local buffer (NULL otherwise) 112 * @param timeout absolute timeout date (as in pthread_cond_timedwait()) 113 * 114 * Return values: 115 * non-negative - the size of action received 116 * negative - error code 117 * 118 * @retval -ETIMEDOUT means no messages were received until timeout. 119 * 120 * NOTE: Action status (replicated or not) is carried in act_id. E.g. -ENOTCONN 121 * means connection to primary component was lost while sending, 122 * -ERESTART means that action delivery was interrupted and it must be 123 * resent. 124 */ 125 extern ssize_t 126 gcs_core_recv (gcs_core_t* conn, 127 struct gcs_act_rcvd* recv_act, 128 long long timeout); 129 130 /* group protocol version */ 131 extern gcs_proto_t 132 gcs_core_group_protocol_version (const gcs_core_t* conn); 133 134 /* Configuration functions */ 135 /* Sets maximum message size to achieve requested network packet size. 136 * In case of failure returns negative error code, in case of success - 137 * resulting message payload size (size of action fragment) */ 138 extern int 139 gcs_core_set_pkt_size (gcs_core_t* conn, int pkt_size); 140 141 /* sends this node's last applied value to group */ 142 extern long 143 gcs_core_set_last_applied (gcs_core_t* core, gcs_seqno_t seqno); 144 145 /* sends status of the ended snapshot (snapshot seqno or error code) */ 146 extern long 147 gcs_core_send_join (gcs_core_t* core, gcs_seqno_t seqno); 148 149 /* sends SYNC notice, seqno currently has no meaning */ 150 extern long 151 gcs_core_send_sync (gcs_core_t* core, gcs_seqno_t seqno); 152 153 /* sends flow control message */ 154 extern long 155 gcs_core_send_fc (gcs_core_t* core, const void* fc, size_t fc_size); 156 157 extern long 158 gcs_core_caused (gcs_core_t* core, gcs_seqno_t& seqno); 159 160 extern long 161 gcs_core_param_set (gcs_core_t* core, const char* key, const char* value); 162 163 extern const char* 164 gcs_core_param_get (gcs_core_t* core, const char* key); 165 166 void gcs_core_get_status(gcs_core_t* core, gu::Status& status); 167 168 #ifdef GCS_CORE_TESTING 169 170 /* gcs_core_send() interface does not allow enough concurrency control to model 171 * various race conditions for unit testing - it is not atomic. The functions 172 * below expose gcs_core unit internals solely for the purpose of testing */ 173 174 #include "gcs_msg_type.hpp" 175 #include "gcs_backend.hpp" 176 177 extern gcs_backend_t* 178 gcs_core_get_backend (gcs_core_t* core); 179 180 // switches lock-step mode on/off 181 extern void 182 gcs_core_send_lock_step (gcs_core_t* core, bool enable); 183 184 // step through action send process (send another fragment). 185 // returns positive number if there was a send thread waiting for it. 186 extern long 187 gcs_core_send_step (gcs_core_t* core, long timeout_ms); 188 189 extern void 190 gcs_core_set_state_uuid (gcs_core_t* core, const gu_uuid_t* uuid); 191 192 #include "gcs_group.hpp" 193 extern const gcs_group_t* 194 gcs_core_get_group (const gcs_core_t* core); 195 196 #include "gcs_fifo_lite.hpp" 197 extern gcs_fifo_lite_t* 198 gcs_core_get_fifo (gcs_core_t* core); 199 200 #endif /* GCS_CORE_TESTING */ 201 202 #endif /* _gcs_core_h_ */ 203