44 #include <qb/qbdefs.h> 45 #include <qb/qblist.h> 46 #include <qb/qbutil.h> 47 #include <qb/qbloop.h> 48 #include <qb/qbipcs.h> 67 static int32_t ipc_not_enough_fds_left = 0;
68 static int32_t ipc_fc_is_quorate;
69 static int32_t ipc_fc_totem_queue_level;
70 static int32_t ipc_fc_sync_in_process;
71 static int32_t ipc_allow_connections = 0;
73 #define CS_IPCS_MAPPER_SERV_NAME 256 89 static int32_t cs_ipcs_job_add(
enum qb_loop_priority p,
void *data, qb_loop_job_dispatch_fn fn);
90 static int32_t cs_ipcs_dispatch_add(
enum qb_loop_priority p, int32_t fd, int32_t events,
91 void *data, qb_ipcs_dispatch_fn_t fn);
92 static int32_t cs_ipcs_dispatch_mod(
enum qb_loop_priority p, int32_t fd, int32_t events,
93 void *data, qb_ipcs_dispatch_fn_t fn);
94 static int32_t cs_ipcs_dispatch_del(int32_t fd);
95 static void outq_flush (
void *data);
98 static struct qb_ipcs_poll_handlers corosync_poll_funcs = {
99 .job_add = cs_ipcs_job_add,
100 .dispatch_add = cs_ipcs_dispatch_add,
101 .dispatch_mod = cs_ipcs_dispatch_mod,
102 .dispatch_del = cs_ipcs_dispatch_del,
105 static int32_t cs_ipcs_connection_accept (qb_ipcs_connection_t *c, uid_t euid, gid_t egid);
106 static void cs_ipcs_connection_created(qb_ipcs_connection_t *c);
107 static int32_t cs_ipcs_msg_process(qb_ipcs_connection_t *c,
108 void *data,
size_t size);
109 static int32_t cs_ipcs_connection_closed (qb_ipcs_connection_t *c);
110 static void cs_ipcs_connection_destroyed (qb_ipcs_connection_t *c);
112 static struct qb_ipcs_service_handlers corosync_service_funcs = {
113 .connection_accept = cs_ipcs_connection_accept,
114 .connection_created = cs_ipcs_connection_created,
115 .msg_process = cs_ipcs_msg_process,
116 .connection_closed = cs_ipcs_connection_closed,
117 .connection_destroyed = cs_ipcs_connection_destroyed,
120 static const char* cs_ipcs_serv_short_name(int32_t service_id)
123 switch (service_id) {
157 ipc_allow_connections = allow;
162 if (ipcs_mapper[service_id].
inst) {
163 qb_ipcs_destroy(ipcs_mapper[service_id].inst);
164 ipcs_mapper[service_id].
inst = NULL;
169 static int32_t cs_ipcs_connection_accept (qb_ipcs_connection_t *c, uid_t euid, gid_t egid)
171 int32_t service = qb_ipcs_service_id_get(c);
175 if (!ipc_allow_connections) {
181 ipcs_mapper[service].
inst == NULL) {
185 if (ipc_not_enough_fds_left) {
189 if (euid == 0 || egid == 0) {
206 static char * pid_to_name (pid_t pid,
char *out_name,
size_t name_len)
214 snprintf (fname, 32,
"/proc/%d/stat", pid);
215 fp = fopen (fname,
"r");
220 if (fgets (buf,
sizeof (buf), fp) == NULL) {
226 name = strrchr (buf,
'(');
234 rest = strrchr (buf,
')');
236 if (rest == NULL || rest[1] !=
' ') {
245 strncpy (out_name, name, name_len);
246 out_name[name_len - 1] =
'\0';
261 static void cs_ipcs_connection_created(qb_ipcs_connection_t *c)
266 struct qb_ipcs_connection_stats stats;
269 int set_client_pid = 0;
270 int set_proc_name = 0;
274 service = qb_ipcs_service_id_get(c);
277 context = calloc(1, size);
278 if (context == NULL) {
279 qb_ipcs_disconnect(c);
288 qb_ipcs_context_set(c, context);
291 log_printf(LOG_ERR,
"lib_init_fn failed, disconnecting");
292 qb_ipcs_disconnect(c);
297 qb_ipcs_connection_stats_get(c, &stats, QB_FALSE);
299 if (stats.client_pid > 0) {
300 if (pid_to_name (stats.client_pid, proc_name,
sizeof(proc_name))) {
302 proc_name, stats.client_pid, c);
306 stats.client_pid, c);
317 qb_ipcs_disconnect(c);
327 if (set_client_pid) {
369 qb_ipcs_connection_ref(conn);
374 qb_ipcs_connection_unref(conn);
380 cnx = qb_ipcs_context_get(conn);
381 return &cnx->
data[0];
384 static void cs_ipcs_connection_destroyed (qb_ipcs_connection_t *c)
392 context = qb_ipcs_context_get(c);
395 list != &context->
outq_head; list = list_next) {
397 list_next = list->
next;
398 outq_item =
list_entry (list,
struct outq_item, list);
401 free (outq_item->
msg);
408 static int32_t cs_ipcs_connection_closed (qb_ipcs_connection_t *c)
411 int32_t service = qb_ipcs_service_id_get(c);
414 const char *key_name;
425 cnx = qb_ipcs_context_get(c);
442 const struct iovec *iov,
443 unsigned int iov_len)
445 int32_t rc = qb_ipcs_response_sendv(conn, iov, iov_len);
454 int32_t rc = qb_ipcs_response_send(conn, msg, mlen);
461 static void outq_flush (
void *
data)
463 qb_ipcs_connection_t *conn =
data;
470 list != &context->
outq_head; list = list_next) {
472 list_next = list->
next;
473 outq_item =
list_entry (list,
struct outq_item, list);
475 rc = qb_ipcs_event_send(conn, outq_item->
msg, outq_item->
mlen);
476 if (rc < 0 && rc != -EAGAIN) {
478 qb_perror(LOG_ERR,
"qb_ipcs_event_send");
480 }
else if (rc == -EAGAIN) {
483 assert(rc == outq_item->
mlen);
488 free (outq_item->
msg);
502 static void msg_send_or_queue(qb_ipcs_connection_t *conn,
const struct iovec *iov, uint32_t iov_len)
506 int32_t bytes_msg = 0;
511 for (i = 0; i < iov_len; i++) {
512 bytes_msg += iov[i].iov_len;
516 assert(list_empty (&context->
outq_head));
517 rc = qb_ipcs_event_sendv(conn, iov, iov_len);
518 if (rc == bytes_msg) {
532 outq_item = malloc (
sizeof (
struct outq_item));
533 if (outq_item == NULL) {
534 qb_ipcs_disconnect(conn);
537 outq_item->
msg = malloc (bytes_msg);
538 if (outq_item->
msg == NULL) {
540 qb_ipcs_disconnect(conn);
544 write_buf = outq_item->
msg;
545 for (i = 0; i < iov_len; i++) {
546 memcpy (write_buf, iov[i].iov_base, iov[i].iov_len);
547 write_buf += iov[i].iov_len;
549 outq_item->
mlen = bytes_msg;
550 list_init (&outq_item->
list);
558 iov.iov_base = (
void *)msg;
560 msg_send_or_queue (conn, &iov, 1);
565 const struct iovec *iov,
566 unsigned int iov_len)
568 msg_send_or_queue(conn, iov, iov_len);
572 static int32_t cs_ipcs_msg_process(qb_ipcs_connection_t *c,
573 void *data,
size_t size)
575 struct qb_ipc_response_header response;
576 struct qb_ipc_request_header *request_pt = (
struct qb_ipc_request_header *)data;
577 int32_t service = qb_ipcs_service_id_get(c);
579 int32_t is_async_call = QB_FALSE;
581 int sending_allowed_private_data;
587 &sending_allowed_private_data);
589 is_async_call = (service ==
CPG_SERVICE && request_pt->id == 2);
595 if (send_ok == -EINVAL) {
596 response.size =
sizeof (response);
600 cnx = qb_ipcs_context_get(c);
607 __func__, response.size, response.error);
609 qb_ipcs_response_send (c,
614 }
else if (send_ok < 0) {
615 cnx = qb_ipcs_context_get(c);
619 if (!is_async_call) {
623 response.size =
sizeof (response);
626 qb_ipcs_response_send (c,
631 "*** %s() (%d:%d - %d) %s!",
632 __func__, service, request_pt->id,
633 is_async_call, strerror(-send_ok));
647 static int32_t cs_ipcs_job_add(
enum qb_loop_priority p,
void *data, qb_loop_job_dispatch_fn fn)
652 static int32_t cs_ipcs_dispatch_add(
enum qb_loop_priority p, int32_t fd, int32_t events,
653 void *data, qb_ipcs_dispatch_fn_t fn)
658 static int32_t cs_ipcs_dispatch_mod(
enum qb_loop_priority p, int32_t fd, int32_t events,
659 void *data, qb_ipcs_dispatch_fn_t fn)
664 static int32_t cs_ipcs_dispatch_del(int32_t fd)
669 static void cs_ipcs_low_fds_event(int32_t not_enough, int32_t fds_available)
671 ipc_not_enough_fds_left = not_enough;
684 return ipc_fc_totem_queue_level;
687 static qb_loop_timer_handle ipcs_check_for_flow_control_timer;
688 static void cs_ipcs_check_for_flow_control(
void)
697 fc_enabled = QB_IPCS_RATE_OFF;
698 if (ipc_fc_is_quorate == 1 ||
705 ipc_fc_sync_in_process == 0) {
706 fc_enabled = QB_FALSE;
713 fc_enabled = QB_FALSE;
715 fc_enabled = QB_IPCS_RATE_OFF_2;
719 qb_ipcs_request_rate_limit(ipcs_mapper[i].
inst, fc_enabled);
724 qb_ipcs_request_rate_limit(ipcs_mapper[i].
inst, QB_IPCS_RATE_FAST);
726 qb_ipcs_request_rate_limit(ipcs_mapper[i].
inst, QB_IPCS_RATE_NORMAL);
728 qb_ipcs_request_rate_limit(ipcs_mapper[i].
inst, QB_IPCS_RATE_SLOW);
733 static void cs_ipcs_fc_quorum_changed(
int quorate,
void *context)
736 cs_ipcs_check_for_flow_control();
739 static void cs_ipcs_totem_queue_level_changed(
enum totem_q_level level)
741 ipc_fc_totem_queue_level = level;
742 cs_ipcs_check_for_flow_control();
747 ipc_fc_sync_in_process = sync_in_process;
748 cs_ipcs_check_for_flow_control();
754 struct qb_ipcs_stats srv_stats;
755 struct qb_ipcs_connection_stats stats;
756 qb_ipcs_connection_t *c, *prev;
764 qb_ipcs_stats_get(ipcs_mapper[i].
inst, &srv_stats, QB_FALSE);
766 for (c = qb_ipcs_connection_first_get(ipcs_mapper[i].inst);
768 prev = c, c = qb_ipcs_connection_next_get(ipcs_mapper[i].inst, prev), qb_ipcs_connection_unref(prev)) {
770 cnx = qb_ipcs_context_get(c);
771 if (cnx == NULL)
continue;
773 qb_ipcs_connection_stats_get(c, &stats, QB_FALSE);
811 static enum qb_ipc_type cs_get_ipc_type (
void)
815 enum qb_ipc_type ret = QB_IPC_NATIVE;
819 return QB_IPC_NATIVE;
822 if (strcmp(str,
"native") == 0) {
827 if (strcmp(str,
"shm") == 0) {
832 if (strcmp(str,
"socket") == 0) {
850 const char *serv_short_name;
852 serv_short_name = cs_ipcs_serv_short_name(service->
id);
856 "NOT Initializing IPC on %s [%d]",
864 return "qb_ipcs_run error";
867 ipcs_mapper[service->
id].
id = service->
id;
868 strcpy(ipcs_mapper[service->
id].
name, serv_short_name);
870 "Initializing IPC on %s [%d]",
871 ipcs_mapper[service->
id].
name,
872 ipcs_mapper[service->
id].
id);
873 ipcs_mapper[service->
id].
inst = qb_ipcs_create(ipcs_mapper[service->
id].
name,
874 ipcs_mapper[service->
id].
id,
876 &corosync_service_funcs);
877 assert(ipcs_mapper[service->
id].
inst);
878 qb_ipcs_poll_handlers_set(ipcs_mapper[service->
id].
inst,
879 &corosync_poll_funcs);
880 if (qb_ipcs_run(ipcs_mapper[service->
id].
inst) != 0) {
882 return "qb_ipcs_run error";
void cs_ipc_refcnt_dec(void *conn)
int32_t cs_ipcs_q_level_get(void)
const char * cs_ipcs_service_init(struct corosync_service_engine *service)
#define CS_IPCS_MAPPER_SERV_NAME
const char * icmap_iter_next(icmap_iter_t iter, size_t *value_len, icmap_value_types_t *type)
#define LOGSYS_LEVEL_INFO
void cs_ipc_refcnt_inc(void *conn)
struct list_head outq_head
Totem Single Ring Protocol.
void icmap_iter_finalize(icmap_iter_t iter)
qb_loop_t * cs_poll_handle_get(void)
void corosync_recheck_the_q_level(void *data)
int(* quorum_register_callback)(quorum_callback_fn_t callback_fn, void *context)
cs_error_t icmap_set_string(const char *key_name, const char *value)
int cs_ipcs_dispatch_iov_send(void *conn, const struct iovec *iov, unsigned int iov_len)
struct corosync_service_engine * corosync_service[SERVICES_COUNT_MAX]
cs_error_t icmap_inc(const char *key_name)
#define log_printf(level, format, args...)
void cs_ipcs_sync_state_changed(int32_t sync_in_process)
int corosync_sending_allowed(unsigned int service, unsigned int id, const void *msg, void *sending_allowed_private_data)
int cs_ipcs_response_send(void *conn, const void *msg, size_t mlen)
void cs_ipc_allow_connections(int32_t allow)
int(* lib_exit_fn)(void *conn)
#define ICMAP_KEYNAME_MAXLEN
cs_error_t icmap_get_uint8(const char *key_name, uint8_t *u8)
#define LOGSYS_LEVEL_WARNING
struct corosync_lib_handler * lib_engine
cs_error_t icmap_set_uint32(const char *key_name, uint32_t value)
#define LOGSYS_LEVEL_ERROR
cs_error_t icmap_delete(const char *key_name)
int cs_ipcs_response_iov_send(void *conn, const struct iovec *iov, unsigned int iov_len)
#define LOGSYS_LEVEL_DEBUG
LOGSYS_DECLARE_SUBSYS("MAIN")
cs_error_t icmap_dec(const char *key_name)
cs_error_t icmap_set_uint64(const char *key_name, uint64_t value)
struct corosync_api_v1 * apidef_get(void)
void corosync_sending_allowed_release(void *sending_allowed_private_data)
void cs_ipcs_stats_update(void)
#define SERVICES_COUNT_MAX
int32_t cs_ipcs_service_destroy(int32_t service_id)
void totempg_queue_level_register_callback(totem_queue_level_changed_fn)
cs_error_t icmap_get_string(const char *key_name, char **str)
#define list_entry(ptr, type, member)
char name[CS_IPCS_MAPPER_SERV_NAME]
void * cs_ipcs_private_data_get(void *conn)
int cs_ipcs_dispatch_send(void *conn, const void *msg, size_t mlen)
#define LOGSYS_LEVEL_NOTICE
void(* lib_handler_fn)(void *conn, const void *msg)
void icmap_convert_name_to_valid_name(char *key_name)
unsigned int private_data_size
icmap_iter_t icmap_iter_init(const char *prefix)
qb_map_iter_t * icmap_iter_t