41 #include <sys/types.h> 42 #include <sys/socket.h> 44 #include <sys/ioctl.h> 45 #include <netinet/in.h> 55 #include <netinet/in.h> 56 #include <arpa/inet.h> 61 #include <qb/qbipc_common.h> 71 #define MAP_ANONYMOUS MAP_ANON 78 #define GROUP_HASH_SIZE 32 150 static struct list_head downlist_messages_head;
151 static struct list_head joinlist_messages_head;
180 static unsigned int my_member_list_entries;
184 static unsigned int my_old_member_list_entries = 0;
210 static int cpg_lib_init_fn (
void *conn);
212 static int cpg_lib_exit_fn (
void *conn);
214 static void message_handler_req_exec_cpg_procjoin (
218 static void message_handler_req_exec_cpg_procleave (
222 static void message_handler_req_exec_cpg_joinlist (
226 static void message_handler_req_exec_cpg_mcast (
230 static void message_handler_req_exec_cpg_partial_mcast (
234 static void message_handler_req_exec_cpg_downlist_old (
238 static void message_handler_req_exec_cpg_downlist (
242 static void exec_cpg_procjoin_endian_convert (
void *msg);
244 static void exec_cpg_joinlist_endian_convert (
void *msg);
246 static void exec_cpg_mcast_endian_convert (
void *msg);
248 static void exec_cpg_partial_mcast_endian_convert (
void *msg);
250 static void exec_cpg_downlist_endian_convert_old (
void *msg);
252 static void exec_cpg_downlist_endian_convert (
void *msg);
254 static void message_handler_req_lib_cpg_join (
void *conn,
const void *message);
256 static void message_handler_req_lib_cpg_leave (
void *conn,
const void *message);
258 static void message_handler_req_lib_cpg_finalize (
void *conn,
const void *message);
260 static void message_handler_req_lib_cpg_mcast (
void *conn,
const void *message);
262 static void message_handler_req_lib_cpg_partial_mcast (
void *conn,
const void *message);
264 static void message_handler_req_lib_cpg_membership (
void *conn,
265 const void *message);
267 static void message_handler_req_lib_cpg_local_get (
void *conn,
268 const void *message);
270 static void message_handler_req_lib_cpg_iteration_initialize (
272 const void *message);
274 static void message_handler_req_lib_cpg_iteration_next (
276 const void *message);
278 static void message_handler_req_lib_cpg_iteration_finalize (
280 const void *message);
282 static void message_handler_req_lib_cpg_zc_alloc (
284 const void *message);
286 static void message_handler_req_lib_cpg_zc_free (
288 const void *message);
290 static void message_handler_req_lib_cpg_zc_execute (
292 const void *message);
294 static int cpg_node_joinleave_send (
unsigned int pid,
const mar_cpg_name_t *group_name,
int fn,
int reason);
296 static int cpg_exec_send_downlist(
void);
298 static int cpg_exec_send_joinlist(
void);
300 static void downlist_messages_delete (
void);
302 static void downlist_master_choose_and_send (
void);
304 static void joinlist_inform_clients (
void);
306 static void joinlist_messages_delete (
void);
308 static void cpg_sync_init (
309 const unsigned int *trans_list,
310 size_t trans_list_entries,
311 const unsigned int *member_list,
312 size_t member_list_entries,
315 static int cpg_sync_process (
void);
317 static void cpg_sync_activate (
void);
319 static void cpg_sync_abort (
void);
321 static void do_proc_join(
327 static void do_proc_leave(
333 static int notify_lib_totem_membership (
335 int member_list_entries,
336 const unsigned int *member_list);
338 static inline int zcb_all_free (
341 static char *cpg_print_group_name (
354 .lib_handler_fn = message_handler_req_lib_cpg_leave,
358 .lib_handler_fn = message_handler_req_lib_cpg_mcast,
362 .lib_handler_fn = message_handler_req_lib_cpg_membership,
366 .lib_handler_fn = message_handler_req_lib_cpg_local_get,
370 .lib_handler_fn = message_handler_req_lib_cpg_iteration_initialize,
374 .lib_handler_fn = message_handler_req_lib_cpg_iteration_next,
378 .lib_handler_fn = message_handler_req_lib_cpg_iteration_finalize,
382 .lib_handler_fn = message_handler_req_lib_cpg_finalize,
386 .lib_handler_fn = message_handler_req_lib_cpg_zc_alloc,
390 .lib_handler_fn = message_handler_req_lib_cpg_zc_free,
394 .lib_handler_fn = message_handler_req_lib_cpg_zc_execute,
398 .lib_handler_fn = message_handler_req_lib_cpg_partial_mcast,
408 .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
411 .exec_handler_fn = message_handler_req_exec_cpg_procleave,
412 .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
415 .exec_handler_fn = message_handler_req_exec_cpg_joinlist,
416 .exec_endian_convert_fn = exec_cpg_joinlist_endian_convert
419 .exec_handler_fn = message_handler_req_exec_cpg_mcast,
420 .exec_endian_convert_fn = exec_cpg_mcast_endian_convert
423 .exec_handler_fn = message_handler_req_exec_cpg_downlist_old,
424 .exec_endian_convert_fn = exec_cpg_downlist_endian_convert_old
427 .exec_handler_fn = message_handler_req_exec_cpg_downlist,
428 .exec_endian_convert_fn = exec_cpg_downlist_endian_convert
431 .exec_handler_fn = message_handler_req_exec_cpg_partial_mcast,
432 .exec_endian_convert_fn = exec_cpg_partial_mcast_endian_convert
437 .
name =
"corosync cluster closed process group service v1.01",
440 .private_data_size =
sizeof (
struct cpg_pd),
443 .lib_init_fn = cpg_lib_init_fn,
444 .lib_exit_fn = cpg_lib_exit_fn,
445 .lib_engine = cpg_lib_engine,
447 .exec_init_fn = cpg_exec_init_fn,
448 .exec_dump_fn = NULL,
449 .exec_engine = cpg_exec_engine,
451 .sync_init = cpg_sync_init,
452 .sync_process = cpg_sync_process,
453 .sync_activate = cpg_sync_activate,
454 .sync_abort = cpg_sync_abort
459 return (&cpg_service_engine);
463 struct qb_ipc_request_header header __attribute__((aligned(8)));
470 struct qb_ipc_request_header header __attribute__((aligned(8)));
479 struct qb_ipc_request_header header __attribute__((aligned(8)));
490 struct qb_ipc_request_header header __attribute__((aligned(8)));
496 struct qb_ipc_request_header header __attribute__((aligned(8)));
531 for (i = 0; i < group->length; i++) {
534 if (c >=
' ' && c < 0x7f && c !=
'\\') {
538 res[dest_pos++] =
'\\';
539 res[dest_pos++] =
'\\';
541 snprintf(res + dest_pos,
sizeof(res) - dest_pos,
"\\x%02X", c);
551 static void cpg_sync_init (
552 const unsigned int *trans_list,
553 size_t trans_list_entries,
554 const unsigned int *member_list,
555 size_t member_list_entries,
564 memcpy (my_member_list, member_list, member_list_entries *
565 sizeof (
unsigned int));
566 my_member_list_entries = member_list_entries;
568 last_sync_ring_id.nodeid = ring_id->
rep.
nodeid;
569 last_sync_ring_id.seq = ring_id->
seq;
577 for (i = 0; i < my_old_member_list_entries; i++) {
579 for (j = 0; j < trans_list_entries; j++) {
580 if (my_old_member_list[i] == trans_list[j]) {
586 g_req_exec_cpg_downlist.nodeids[entries++] =
587 my_old_member_list[i];
590 g_req_exec_cpg_downlist.left_nodes = entries;
593 static int cpg_sync_process (
void)
598 res = cpg_exec_send_downlist();
605 res = cpg_exec_send_joinlist();
610 static void cpg_sync_activate (
void)
612 memcpy (my_old_member_list, my_member_list,
613 my_member_list_entries *
sizeof (
unsigned int));
614 my_old_member_list_entries = my_member_list_entries;
617 downlist_master_choose_and_send ();
620 joinlist_inform_clients ();
622 downlist_messages_delete ();
624 joinlist_messages_delete ();
626 notify_lib_totem_membership (NULL, my_member_list_entries, my_member_list);
629 static void cpg_sync_abort (
void)
632 downlist_messages_delete ();
633 joinlist_messages_delete ();
636 static int notify_lib_totem_membership (
638 int member_list_entries,
639 const unsigned int *member_list)
653 res->member_list_entries = member_list_entries;
654 res->header.size =
size;
656 res->header.error =
CS_OK;
662 for (iter = cpg_pd_list_head.
next; iter != &cpg_pd_list_head; iter = iter->
next) {
673 static int notify_lib_joinlist(
676 int joined_list_entries,
678 int left_list_entries,
691 for (iter = process_info_list_head.
next; iter != &process_info_list_head; iter = iter->
next) {
693 if (mar_name_compare (&pi->
group, group_name) == 0) {
697 for (i = 0; i < left_list_entries; i++) {
698 if (left_list[i].
nodeid == pi->
nodeid && left_list[i].pid == pi->
pid) {
715 res->joined_list_entries = joined_list_entries;
716 res->left_list_entries = left_list_entries;
717 res->member_list_entries = count;
719 res->header.size =
size;
721 res->header.error =
CS_OK;
724 for (iter = process_info_list_head.
next; iter != &process_info_list_head; iter = iter->
next) {
727 if (mar_name_compare (&pi->
group, group_name) == 0) {
731 for (i = 0;i < left_list_entries; i++) {
732 if (left_list[i].
nodeid == pi->
nodeid && left_list[i].pid == pi->
pid) {
738 retgi->nodeid = pi->
nodeid;
739 retgi->pid = pi->
pid;
745 if (left_list_entries) {
747 retgi += left_list_entries;
750 if (joined_list_entries) {
752 retgi += joined_list_entries;
758 for (iter = cpg_pd_list_head.
next; iter != &cpg_pd_list_head; iter = iter->
next) {
760 if (mar_name_compare (&cpd->
group_name, group_name) == 0) {
761 assert (joined_list_entries <= 1);
762 if (joined_list_entries) {
763 if (joined_list[0].
pid == cpd->
pid &&
774 if (left_list_entries) {
775 if (left_list[0].
pid == cpd->
pid &&
792 for (iter = cpg_pd_list_head.
next; iter != &cpg_pd_list_head; iter = iter->
next) {
798 notify_lib_totem_membership (cpd->
conn, my_old_member_list_entries, my_old_member_list);
805 static void downlist_log(
const char *msg,
struct downlist_msg* dl)
808 "%s: sender %s; members(old:%d left:%d)",
815 static struct downlist_msg* downlist_master_choose (
void)
820 uint32_t cmp_members;
821 uint32_t best_members;
825 for (iter = downlist_messages_head.
next;
826 iter != &downlist_messages_head;
830 downlist_log(
"comparing", cmp);
833 for (i = 0; i < cmp->left_nodes; i++) {
835 log_printf (LOG_DEBUG,
"Ignoring this entry because I'm in the left list\n");
851 best_members = best->old_members - best->left_nodes;
852 cmp_members = cmp->old_members - cmp->left_nodes;
854 if (cmp_members > best_members) {
856 }
else if (cmp_members == best_members) {
857 if (cmp->old_members > best->old_members) {
859 }
else if (cmp->old_members == best->old_members) {
867 assert (best != NULL);
872 static void downlist_master_choose_and_send (
void)
883 int left_list_entries;
886 qb_map_iter_t *miter;
891 stored_msg = downlist_master_choose ();
896 downlist_log(
"chosen downlist", stored_msg);
898 group_map = qb_skiplist_create();
905 for (iter = process_info_list_head.
next; iter != &process_info_list_head; ) {
910 for (i = 0; i < stored_msg->left_nodes; i++) {
912 if (pi->
nodeid == stored_msg->nodeids[i]) {
919 marshall_from_mar_cpg_name_t(&cpg_group, &left_pi->
group);
920 cpg_group.value[cpg_group.length] = 0;
922 pcd = (
struct confchg_data *)qb_map_get(group_map, cpg_group.value);
924 pcd = (
struct confchg_data *)calloc(1,
sizeof(
struct confchg_data));
925 memcpy(&pcd->cpg_group, &cpg_group,
sizeof(
struct cpg_name));
926 qb_map_put(group_map, pcd->cpg_group.value, pcd);
928 size = pcd->left_list_entries;
929 pcd->left_list[
size].nodeid = left_pi->
nodeid;
930 pcd->left_list[
size].pid = left_pi->
pid;
932 pcd->left_list_entries++;
933 list_del (&left_pi->
list);
939 miter = qb_map_iter_create(group_map);
940 while (qb_map_iter_next(miter, (
void **)&pcd)) {
941 marshall_to_mar_cpg_name_t(&group, &pcd->cpg_group);
943 log_printf (LOG_DEBUG,
"left_list_entries:%d", pcd->left_list_entries);
944 for (i=0; i<pcd->left_list_entries; i++) {
945 log_printf (LOG_DEBUG,
"left_list[%d] group:%s, ip:%s, pid:%d",
946 i, cpg_print_group_name(&group),
948 pcd->left_list[i].pid);
952 notify_lib_joinlist(&group, NULL,
954 pcd->left_list_entries,
960 qb_map_iter_free(miter);
961 qb_map_destroy(group_map);
967 static void joinlist_remove_zombie_pi_entries (
void)
975 for (pi_iter = process_info_list_head.
next; pi_iter != &process_info_list_head; ) {
977 pi_iter = pi_iter->
next;
990 for (jl_iter = joinlist_messages_head.
next;
991 jl_iter != &joinlist_messages_head;
992 jl_iter = jl_iter->
next) {
1001 pi->
pid == stored_msg->
pid &&
1014 static void joinlist_inform_clients (
void)
1021 for (iter = joinlist_messages_head.
next;
1022 iter != &joinlist_messages_head;
1023 iter = iter->
next) {
1027 log_printf (LOG_DEBUG,
"joinlist_messages[%u] group:%s, ip:%s, pid:%d",
1028 i++, cpg_print_group_name(&stored_msg->
group_name),
1041 joinlist_remove_zombie_pi_entries ();
1044 static void downlist_messages_delete (
void)
1049 for (iter = downlist_messages_head.
next;
1050 iter != &downlist_messages_head;
1053 iter_next = iter->
next;
1056 list_del (&stored_msg->
list);
1061 static void joinlist_messages_delete (
void)
1066 for (iter = joinlist_messages_head.
next;
1067 iter != &joinlist_messages_head;
1070 iter_next = iter->
next;
1073 list_del (&stored_msg->
list);
1076 list_init (&joinlist_messages_head);
1081 list_init (&downlist_messages_head);
1082 list_init (&joinlist_messages_head);
1096 iter_next = iter->
next;
1099 list_del (&pi->
list);
1103 list_del (&cpg_iteration_instance->
list);
1104 hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_instance->
handle);
1107 static void cpg_pd_finalize (
struct cpg_pd *cpd)
1110 struct cpg_iteration_instance *cpii;
1117 iter_next = iter->
next;
1119 cpii =
list_entry (iter,
struct cpg_iteration_instance, list);
1121 cpg_iteration_instance_finalize (cpii);
1124 list_del (&cpd->
list);
1127 static int cpg_lib_exit_fn (
void *conn)
1138 cpg_pd_finalize (cpd);
1144 static int cpg_node_joinleave_send (
unsigned int pid,
const mar_cpg_name_t *group_name,
int fn,
int reason)
1147 struct iovec req_exec_cpg_iovec;
1166 static void exec_cpg_procjoin_endian_convert (
void *msg)
1170 req_exec_cpg_procjoin->pid =
swab32(req_exec_cpg_procjoin->pid);
1171 swab_mar_cpg_name_t (&req_exec_cpg_procjoin->group_name);
1172 req_exec_cpg_procjoin->reason =
swab32(req_exec_cpg_procjoin->reason);
1175 static void exec_cpg_joinlist_endian_convert (
void *msg_v)
1178 struct qb_ipc_response_header *res = (
struct qb_ipc_response_header *)msg;
1181 swab_mar_int32_t (&res->size);
1183 while ((
const char*)jle < msg + res->size) {
1190 static void exec_cpg_downlist_endian_convert_old (
void *msg)
1194 static void exec_cpg_downlist_endian_convert (
void *msg)
1199 req_exec_cpg_downlist->left_nodes =
swab32(req_exec_cpg_downlist->left_nodes);
1200 req_exec_cpg_downlist->old_members =
swab32(req_exec_cpg_downlist->old_members);
1202 for (i = 0; i < req_exec_cpg_downlist->left_nodes; i++) {
1203 req_exec_cpg_downlist->nodeids[i] =
swab32(req_exec_cpg_downlist->nodeids[i]);
1208 static void exec_cpg_mcast_endian_convert (
void *msg)
1212 swab_coroipc_request_header_t (&req_exec_cpg_mcast->header);
1213 swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
1214 req_exec_cpg_mcast->pid =
swab32(req_exec_cpg_mcast->pid);
1215 req_exec_cpg_mcast->msglen =
swab32(req_exec_cpg_mcast->msglen);
1216 swab_mar_message_source_t (&req_exec_cpg_mcast->source);
1219 static void exec_cpg_partial_mcast_endian_convert (
void *msg)
1223 swab_coroipc_request_header_t (&req_exec_cpg_mcast->header);
1224 swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
1225 req_exec_cpg_mcast->pid =
swab32(req_exec_cpg_mcast->pid);
1226 req_exec_cpg_mcast->msglen =
swab32(req_exec_cpg_mcast->msglen);
1227 req_exec_cpg_mcast->fraglen =
swab32(req_exec_cpg_mcast->fraglen);
1228 req_exec_cpg_mcast->type =
swab32(req_exec_cpg_mcast->type);
1229 swab_mar_message_source_t (&req_exec_cpg_mcast->source);
1235 for (iter = process_info_list_head.
next; iter != &process_info_list_head; ) {
1239 if (pi->
pid == pid && pi->
nodeid == nodeid &&
1240 mar_name_compare (&pi->
group, group_name) == 0) {
1248 static void do_proc_join(
1251 unsigned int nodeid,
1260 if (process_info_find (name, pid, nodeid) != NULL) {
1270 memcpy(&pi->
group, name,
sizeof(*name));
1271 list_init(&pi->
list);
1276 list_to_add = &process_info_list_head;
1277 for (list = process_info_list_head.
next; list != &process_info_list_head; list = list->
next) {
1287 list_add (&pi->
list, list_to_add);
1289 notify_info.pid = pi->
pid;
1290 notify_info.nodeid =
nodeid;
1291 notify_info.reason = reason;
1293 notify_lib_joinlist(&pi->
group, NULL,
1299 static void do_proc_leave(
1302 unsigned int nodeid,
1309 notify_info.pid = pid;
1310 notify_info.nodeid =
nodeid;
1311 notify_info.reason = reason;
1313 notify_lib_joinlist(name, NULL,
1318 for (iter = process_info_list_head.
next; iter != &process_info_list_head; ) {
1322 if (pi->
pid == pid && pi->
nodeid == nodeid &&
1323 mar_name_compare (&pi->
group, name)==0) {
1324 list_del (&pi->
list);
1330 static void message_handler_req_exec_cpg_downlist_old (
1331 const void *message,
1332 unsigned int nodeid)
1338 static void message_handler_req_exec_cpg_downlist(
1339 const void *message,
1340 unsigned int nodeid)
1342 const struct req_exec_cpg_downlist *req_exec_cpg_downlist = message;
1350 req_exec_cpg_downlist->left_nodes, downlist_state);
1356 stored_msg->old_members = req_exec_cpg_downlist->old_members;
1357 stored_msg->left_nodes = req_exec_cpg_downlist->left_nodes;
1358 memcpy (stored_msg->nodeids, req_exec_cpg_downlist->nodeids,
1359 req_exec_cpg_downlist->left_nodes * sizeof (
mar_uint32_t));
1360 list_init (&stored_msg->
list);
1361 list_add (&stored_msg->
list, &downlist_messages_head);
1363 for (i = 0; i < my_member_list_entries; i++) {
1365 for (iter = downlist_messages_head.
next;
1366 iter != &downlist_messages_head;
1367 iter = iter->
next) {
1379 downlist_master_choose_and_send ();
1383 static void message_handler_req_exec_cpg_procjoin (
1384 const void *message,
1385 unsigned int nodeid)
1392 (
unsigned int)req_exec_cpg_procjoin->pid);
1394 do_proc_join (&req_exec_cpg_procjoin->group_name,
1395 req_exec_cpg_procjoin->pid, nodeid,
1399 static void message_handler_req_exec_cpg_procleave (
1400 const void *message,
1401 unsigned int nodeid)
1408 (
unsigned int)req_exec_cpg_procjoin->pid);
1410 do_proc_leave (&req_exec_cpg_procjoin->group_name,
1411 req_exec_cpg_procjoin->pid, nodeid,
1412 req_exec_cpg_procjoin->reason);
1417 static void message_handler_req_exec_cpg_joinlist (
1418 const void *message_v,
1419 unsigned int nodeid)
1421 const char *message = message_v;
1422 const struct qb_ipc_response_header *res = (
const struct qb_ipc_response_header *)message;
1429 while ((
const char*)jle < message + res->size) {
1433 stored_msg->
pid = jle->
pid;
1435 list_init (&stored_msg->
list);
1436 list_add (&stored_msg->
list, &joinlist_messages_head);
1441 static void message_handler_req_exec_cpg_mcast (
1442 const void *message,
1443 unsigned int nodeid)
1447 int msglen = req_exec_cpg_mcast->msglen;
1450 struct iovec iovec[2];
1464 iovec[1].iov_base = (
char*)message+
sizeof(*req_exec_cpg_mcast);
1465 iovec[1].iov_len = msglen;
1467 for (iter = cpg_pd_list_head.
next; iter != &cpg_pd_list_head; ) {
1472 && (mar_name_compare (&cpd->
group_name, &req_exec_cpg_mcast->group_name) == 0)) {
1476 for (pi_iter = process_info_list_head.
next;
1477 pi_iter != &process_info_list_head; pi_iter = pi_iter->
next) {
1481 if (pi->
nodeid == nodeid &&
1482 mar_name_compare (&pi->
group, &req_exec_cpg_mcast->group_name) == 0) {
1499 static void message_handler_req_exec_cpg_partial_mcast (
1500 const void *message,
1501 unsigned int nodeid)
1505 int msglen = req_exec_cpg_mcast->fraglen;
1508 struct iovec iovec[2];
1526 iovec[1].iov_base = (
char*)message+
sizeof(*req_exec_cpg_mcast);
1527 iovec[1].iov_len = msglen;
1529 for (iter = cpg_pd_list_head.
next; iter != &cpg_pd_list_head; ) {
1534 && (mar_name_compare (&cpd->
group_name, &req_exec_cpg_mcast->group_name) == 0)) {
1538 for (pi_iter = process_info_list_head.
next;
1539 pi_iter != &process_info_list_head; pi_iter = pi_iter->
next) {
1543 if (pi->
nodeid == nodeid &&
1544 mar_name_compare (&pi->
group, &req_exec_cpg_mcast->group_name) == 0) {
1562 static int cpg_exec_send_downlist(
void)
1567 g_req_exec_cpg_downlist.header.size =
sizeof(
struct req_exec_cpg_downlist);
1569 g_req_exec_cpg_downlist.old_members = my_old_member_list_entries;
1571 iov.iov_base = (
void *)&g_req_exec_cpg_downlist;
1572 iov.iov_len = g_req_exec_cpg_downlist.header.size;
1577 static int cpg_exec_send_joinlist(
void)
1581 struct qb_ipc_response_header *res;
1584 struct iovec req_exec_cpg_iovec;
1586 for (iter = process_info_list_head.
next; iter != &process_info_list_head; iter = iter->
next) {
1598 buf = alloca(
sizeof(
struct qb_ipc_response_header) +
sizeof(
struct join_list_entry) * count);
1604 jle = (
struct join_list_entry *)(buf +
sizeof(
struct qb_ipc_response_header));
1605 res = (
struct qb_ipc_response_header *)buf;
1607 for (iter = process_info_list_head.
next; iter != &process_info_list_head; iter = iter->
next) {
1618 res->size =
sizeof(
struct qb_ipc_response_header)+sizeof(struct
join_list_entry) * count;
1620 req_exec_cpg_iovec.iov_base = buf;
1621 req_exec_cpg_iovec.iov_len = res->size;
1626 static int cpg_lib_init_fn (
void *conn)
1629 memset (cpd, 0,
sizeof(
struct cpg_pd));
1631 list_add (&cpd->
list, &cpg_pd_list_head);
1642 static void message_handler_req_lib_cpg_join (
void *conn,
const void *message)
1651 for (iter = cpg_pd_list_head.
next; iter != &cpg_pd_list_head; iter = iter->
next) {
1654 if (cpd_item->
pid == req_lib_cpg_join->pid &&
1655 mar_name_compare(&req_lib_cpg_join->group_name, &cpd_item->
group_name) == 0) {
1667 for (iter = process_info_list_head.
next; iter != &process_info_list_head; iter = iter->
next) {
1671 mar_name_compare(&req_lib_cpg_join->group_name, &pi->
group) == 0) {
1687 cpd->
pid = req_lib_cpg_join->pid;
1688 cpd->
flags = req_lib_cpg_join->flags;
1689 memcpy (&cpd->
group_name, &req_lib_cpg_join->group_name,
1692 cpg_node_joinleave_send (req_lib_cpg_join->pid,
1693 &req_lib_cpg_join->group_name,
1715 static void message_handler_req_lib_cpg_leave (
void *conn,
const void *message)
1737 cpg_node_joinleave_send (req_lib_cpg_leave->pid,
1738 &req_lib_cpg_leave->group_name,
1752 static void message_handler_req_lib_cpg_finalize (
1754 const void *message)
1766 list_del (&cpd->
list);
1767 list_init (&cpd->
list);
1787 fd = open (path, O_RDWR, 0600);
1795 res = ftruncate (fd, bytes);
1797 goto error_close_unlink;
1800 addr = mmap (NULL, bytes, PROT_READ | PROT_WRITE,
1803 if (addr == MAP_FAILED) {
1804 goto error_close_unlink;
1807 madvise(addr, bytes, MADV_NOSYNC);
1823 static inline int zcb_alloc (
1825 const char *path_to_file,
1832 zcb_mapped = malloc (
sizeof (
struct zcb_mapped));
1833 if (zcb_mapped == NULL) {
1846 list_init (&zcb_mapped->
list);
1854 static inline int zcb_free (
struct zcb_mapped *zcb_mapped)
1858 res = munmap (zcb_mapped->
addr, zcb_mapped->
size);
1859 list_del (&zcb_mapped->
list);
1864 static inline int zcb_by_addr_free (
struct cpg_pd *cpd,
void *addr)
1867 struct zcb_mapped *zcb_mapped;
1868 unsigned int res = 0;
1873 zcb_mapped =
list_entry (list,
struct zcb_mapped, list);
1875 if (zcb_mapped->
addr == addr) {
1876 res = zcb_free (zcb_mapped);
1884 static inline int zcb_all_free (
1888 struct zcb_mapped *zcb_mapped;
1893 zcb_mapped =
list_entry (list,
struct zcb_mapped, list);
1897 zcb_free (zcb_mapped);
1907 static uint64_t void2serveraddr (
void *server_ptr)
1915 static void *serveraddr2void (uint64_t
server_addr)
1923 static void message_handler_req_lib_cpg_zc_alloc (
1925 const void *message)
1928 struct qb_ipc_response_header res_header;
1936 res = zcb_alloc (cpd, hdr->path_to_file, hdr->map_size,
1943 res_header.size =
sizeof (
struct qb_ipc_response_header);
1950 static void message_handler_req_lib_cpg_zc_free (
1952 const void *message)
1955 struct qb_ipc_response_header res_header;
1961 addr = serveraddr2void (hdr->server_address);
1963 zcb_by_addr_free (cpd, addr);
1965 res_header.size =
sizeof (
struct qb_ipc_response_header);
1973 static void message_handler_req_lib_cpg_partial_mcast (
void *conn,
const void *message)
1979 struct iovec req_exec_cpg_iovec[2];
1982 int msglen = req_lib_cpg_mcast->fraglen;
2004 res_lib_cpg_partial_send.header.size =
sizeof(res_lib_cpg_partial_send);
2014 if (error ==
CS_OK) {
2015 req_exec_cpg_mcast.header.size =
sizeof(req_exec_cpg_mcast) + msglen;
2018 req_exec_cpg_mcast.pid = cpd->
pid;
2019 req_exec_cpg_mcast.msglen = req_lib_cpg_mcast->msglen;
2020 req_exec_cpg_mcast.type = req_lib_cpg_mcast->type;
2021 req_exec_cpg_mcast.fraglen = req_lib_cpg_mcast->fraglen;
2023 memcpy(&req_exec_cpg_mcast.group_name, &group_name,
2026 req_exec_cpg_iovec[0].iov_base = (
char *)&req_exec_cpg_mcast;
2027 req_exec_cpg_iovec[0].iov_len =
sizeof(req_exec_cpg_mcast);
2028 req_exec_cpg_iovec[1].iov_base = (
char *)&req_lib_cpg_mcast->message;
2029 req_exec_cpg_iovec[1].iov_len = msglen;
2032 assert(result == 0);
2035 conn, group_name.value, cpd->
cpd_state, error);
2038 res_lib_cpg_partial_send.header.error = error;
2040 sizeof (res_lib_cpg_partial_send));
2044 static void message_handler_req_lib_cpg_mcast (
void *conn,
const void *message)
2050 struct iovec req_exec_cpg_iovec[2];
2051 struct req_exec_cpg_mcast req_exec_cpg_mcast;
2052 int msglen = req_lib_cpg_mcast->msglen;
2073 if (error ==
CS_OK) {
2074 req_exec_cpg_mcast.header.size =
sizeof(req_exec_cpg_mcast) + msglen;
2077 req_exec_cpg_mcast.pid = cpd->
pid;
2078 req_exec_cpg_mcast.msglen = msglen;
2080 memcpy(&req_exec_cpg_mcast.group_name, &group_name,
2083 req_exec_cpg_iovec[0].iov_base = (
char *)&req_exec_cpg_mcast;
2084 req_exec_cpg_iovec[0].iov_len =
sizeof(req_exec_cpg_mcast);
2085 req_exec_cpg_iovec[1].iov_base = (
char *)&req_lib_cpg_mcast->message;
2086 req_exec_cpg_iovec[1].iov_len = msglen;
2089 assert(result == 0);
2092 conn, group_name.value, cpd->
cpd_state, error);
2096 static void message_handler_req_lib_cpg_zc_execute (
2098 const void *message)
2101 struct qb_ipc_request_header *
header;
2104 struct iovec req_exec_cpg_iovec[2];
2105 struct req_exec_cpg_mcast req_exec_cpg_mcast;
2112 header = (
struct qb_ipc_request_header *)(((
char *)serveraddr2void(hdr->server_address) + sizeof (
struct coroipcs_zc_header)));
2113 req_lib_cpg_mcast = (
struct req_lib_cpg_mcast *)header;
2130 res_lib_cpg_mcast.header.size =
sizeof(res_lib_cpg_mcast);
2132 if (error ==
CS_OK) {
2133 req_exec_cpg_mcast.header.size =
sizeof(req_exec_cpg_mcast) + req_lib_cpg_mcast->msglen;
2136 req_exec_cpg_mcast.pid = cpd->
pid;
2137 req_exec_cpg_mcast.msglen = req_lib_cpg_mcast->msglen;
2139 memcpy(&req_exec_cpg_mcast.group_name, &cpd->
group_name,
2142 req_exec_cpg_iovec[0].iov_base = (
char *)&req_exec_cpg_mcast;
2143 req_exec_cpg_iovec[0].iov_len =
sizeof(req_exec_cpg_mcast);
2144 req_exec_cpg_iovec[1].iov_base = (
char *)header +
sizeof(
struct req_lib_cpg_mcast);
2145 req_exec_cpg_iovec[1].iov_len = req_exec_cpg_mcast.msglen;
2149 res_lib_cpg_mcast.header.error =
CS_OK;
2154 res_lib_cpg_mcast.header.error = error;
2158 sizeof (res_lib_cpg_mcast));
2162 static void message_handler_req_lib_cpg_membership (
void *conn,
2163 const void *message)
2166 (
struct req_lib_cpg_membership_get *)message;
2169 int member_count = 0;
2172 res_lib_cpg_membership_get.header.error =
CS_OK;
2173 res_lib_cpg_membership_get.header.size =
2174 sizeof (
struct res_lib_cpg_membership_get);
2176 for (iter = process_info_list_head.
next;
2177 iter != &process_info_list_head; iter = iter->
next) {
2180 if (mar_name_compare (&pi->
group, &req_lib_cpg_membership_get->group_name) == 0) {
2182 res_lib_cpg_membership_get.
member_list[member_count].pid = pi->
pid;
2186 res_lib_cpg_membership_get.member_count = member_count;
2189 sizeof (res_lib_cpg_membership_get));
2192 static void message_handler_req_lib_cpg_local_get (
void *conn,
2193 const void *message)
2197 res_lib_cpg_local_get.header.size =
sizeof (res_lib_cpg_local_get);
2199 res_lib_cpg_local_get.header.error =
CS_OK;
2203 sizeof (res_lib_cpg_local_get));
2206 static void message_handler_req_lib_cpg_iteration_initialize (
2208 const void *message)
2215 struct cpg_iteration_instance *cpg_iteration_instance;
2228 res = hdb_handle_create (&cpg_iteration_handle_t_db,
sizeof (
struct cpg_iteration_instance),
2229 &cpg_iteration_handle);
2236 res = hdb_handle_get (&cpg_iteration_handle_t_db, cpg_iteration_handle, (
void *)&cpg_iteration_instance);
2244 cpg_iteration_instance->
handle = cpg_iteration_handle;
2249 for (iter = process_info_list_head.
next; iter != &process_info_list_head; iter = iter->
next) {
2261 iter2 = iter2->
next) {
2264 if (mar_name_compare (&pi2->
group, &pi->
group) == 0) {
2280 if (mar_name_compare (&pi->
group, &req_lib_cpg_iterationinitialize->group_name) != 0)
2293 goto error_put_destroy;
2297 list_init (&new_pi->
list);
2311 iter2 = iter2->
next) {
2314 if (mar_name_compare (&pi2->
group, &pi->
group) == 0) {
2319 list_add (&new_pi->
list, iter2);
2329 list_init (&cpg_iteration_instance->
list);
2335 hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_handle);
2337 if (error !=
CS_OK) {
2338 hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_handle);
2342 res_lib_cpg_iterationinitialize.header.size =
sizeof (res_lib_cpg_iterationinitialize);
2344 res_lib_cpg_iterationinitialize.header.error = error;
2345 res_lib_cpg_iterationinitialize.iteration_handle = cpg_iteration_handle;
2348 sizeof (res_lib_cpg_iterationinitialize));
2351 static void message_handler_req_lib_cpg_iteration_next (
2353 const void *message)
2357 struct cpg_iteration_instance *cpg_iteration_instance;
2364 res = hdb_handle_get (&cpg_iteration_handle_t_db,
2365 req_lib_cpg_iterationnext->iteration_handle,
2366 (
void *)&cpg_iteration_instance);
2373 assert (cpg_iteration_instance);
2387 res_lib_cpg_iterationnext.description.nodeid = pi->
nodeid;
2388 res_lib_cpg_iterationnext.description.pid = pi->
pid;
2389 memcpy (&res_lib_cpg_iterationnext.description.group,
2394 hdb_handle_put (&cpg_iteration_handle_t_db, req_lib_cpg_iterationnext->iteration_handle);
2396 res_lib_cpg_iterationnext.header.size =
sizeof (res_lib_cpg_iterationnext);
2398 res_lib_cpg_iterationnext.header.error = error;
2401 sizeof (res_lib_cpg_iterationnext));
2404 static void message_handler_req_lib_cpg_iteration_finalize (
2406 const void *message)
2410 struct cpg_iteration_instance *cpg_iteration_instance;
2416 res = hdb_handle_get (&cpg_iteration_handle_t_db,
2417 req_lib_cpg_iterationfinalize->iteration_handle,
2418 (
void *)&cpg_iteration_instance);
2425 assert (cpg_iteration_instance);
2427 cpg_iteration_instance_finalize (cpg_iteration_instance);
2428 hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_instance->
handle);
2431 res_lib_cpg_iterationfinalize.header.size =
sizeof (res_lib_cpg_iterationfinalize);
2433 res_lib_cpg_iterationfinalize.header.error = error;
2436 sizeof (res_lib_cpg_iterationfinalize));
void *(* ipc_private_data_get)(void *conn)
int initial_totem_conf_sent
mar_cpg_address_t member_list[]
mar_uint32_t sender_nodeid
#define CPG_MAX_NAME_LENGTH
uint64_t initial_transition_counter
#define LOGSYS_LEVEL_TRACE
mar_uint32_t sender_nodeid
#define CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF
int(* ipc_dispatch_iov_send)(void *conn, const struct iovec *iov, unsigned int iov_len)
int(* ipc_response_send)(void *conn, const void *msg, size_t mlen)
struct corosync_service_engine * cpg_get_service_engine_ver0(void)
struct message_header header
struct list_head * current_pointer
int(* totem_mcast)(const struct iovec *iovec, unsigned int iov_len, unsigned int guarantee)
uint64_t transition_counter
#define log_printf(level, format, args...)
void(* exec_handler_fn)(const void *msg, unsigned int nodeid)
struct list_head iteration_instance_list_head
#define SERVICE_ID_MAKE(a, b)
#define LOGSYS_LEVEL_WARNING
unsigned int(* totem_nodeid_get)(void)
void(* ipc_refcnt_dec)(void *conn)
#define LOGSYS_LEVEL_ERROR
struct totem_ip_address rep
mar_uint32_t member_list[]
#define LOGSYS_LEVEL_DEBUG
mar_cpg_address_t member_list[PROCESSOR_COUNT_MAX]
mar_cpg_name_t group_name
mar_cpg_name_t group_name
LOGSYS_DECLARE_SUBSYS("CPG")
DECLARE_HDB_DATABASE(cpg_iteration_handle_t_db, NULL)
#define PROCESSOR_COUNT_MAX
struct corosync_service_engine cpg_service_engine
struct list_head zcb_mapped_list_head
const char *(* totem_ifaces_print)(unsigned int nodeid)
#define list_entry(ptr, type, member)
mar_cpg_name_t group_name
void(* lib_handler_fn)(void *conn, const void *msg)
int(* ipc_dispatch_send)(void *conn, const void *msg, size_t mlen)
struct list_head items_list_head
struct memb_ring_id ring_id
void(* ipc_source_set)(mar_message_source_t *source, void *conn)
DECLARE_LIST_INIT(cpg_pd_list_head)
Message from another node.
void(* ipc_refcnt_inc)(void *conn)