57 #include <sys/types.h> 59 #include <sys/socket.h> 62 #include <sys/ioctl.h> 63 #include <sys/param.h> 64 #include <netinet/in.h> 65 #include <arpa/inet.h> 78 #include <qb/qbdefs.h> 79 #include <qb/qbutil.h> 80 #include <qb/qbloop.h> 86 #define LOGSYS_UTILS_ONLY 1 95 #define LOCALHOST_IP inet_addr("127.0.0.1") 96 #define QUEUE_RTR_ITEMS_SIZE_MAX 16384 97 #define RETRANS_MESSAGE_QUEUE_SIZE_MAX 16384 98 #define RECEIVED_MESSAGE_QUEUE_SIZE_MAX 500 100 #define RETRANSMIT_ENTRIES_MAX 30 101 #define TOKEN_SIZE_MAX 64000 102 #define LEAVE_DUMMY_NODEID 0 114 #define SEQNO_START_MSG 0x0 115 #define SEQNO_START_TOKEN 0x0 137 #define ENDIAN_LOCAL 0xff22 374 struct sq regular_sort_queue;
376 struct sq recovery_sort_queue;
433 void (*totemsrp_log_printf) (
436 const char *
function,
439 const char *format, ...)
__attribute__((format(printf, 6, 7)));;
449 void (*totemsrp_deliver_fn) (
452 unsigned int msg_len,
453 int endian_conversion_required);
455 void (*totemsrp_confchg_fn) (
457 const unsigned int *member_list,
size_t member_list_entries,
458 const unsigned int *left_list,
size_t left_list_entries,
459 const unsigned int *joined_list,
size_t joined_list_entries,
462 void (*totemsrp_service_ready_fn) (void);
464 void (*totemsrp_waiting_trans_ack_cb_fn) (
465 int waiting_trans_ack);
467 void (*memb_ring_id_create_or_load) (
471 void (*memb_ring_id_store) (
472 const struct memb_ring_id *memb_ring_id,
525 char commit_token_storage[40000];
530 int (*handler_functions[6]) (
534 int endian_conversion_needed);
579 static int message_handler_orf_token (
583 int endian_conversion_needed);
585 static int message_handler_mcast (
589 int endian_conversion_needed);
591 static int message_handler_memb_merge_detect (
595 int endian_conversion_needed);
597 static int message_handler_memb_join (
601 int endian_conversion_needed);
603 static int message_handler_memb_commit_token (
607 int endian_conversion_needed);
609 static int message_handler_token_hold_cancel (
613 int endian_conversion_needed);
617 static unsigned int main_msgs_missing (
void);
619 static void main_token_seqid_get (
622 unsigned int *token_is);
624 static void srp_addr_copy (
struct srp_addr *dest,
const struct srp_addr *src);
626 static void srp_addr_to_nodeid (
627 unsigned int *nodeid_out,
629 unsigned int entries);
631 static int srp_addr_equal (
const struct srp_addr *a,
const struct srp_addr *b);
637 static void messages_deliver_to_app (
struct totemsrp_instance *instance,
int skip,
unsigned int end_point);
639 int fcc_mcasts_allowed);
640 static void messages_free (
struct totemsrp_instance *instance,
unsigned int token_aru);
644 static void target_set_completed (
void *context);
646 static void memb_state_commit_token_target_set (
struct totemsrp_instance *instance);
651 static void orf_token_endian_convert (
const struct orf_token *in,
struct orf_token *out);
652 static void memb_commit_token_endian_convert (
const struct memb_commit_token *in,
struct memb_commit_token *out);
653 static void memb_join_endian_convert (
const struct memb_join *in,
struct memb_join *out);
654 static void mcast_endian_convert (
const struct mcast *in,
struct mcast *out);
655 static void memb_merge_detect_endian_convert (
658 static void srp_addr_copy_endian_convert (
struct srp_addr *out,
const struct srp_addr *in);
659 static void timer_function_orf_token_timeout (
void *data);
660 static void timer_function_pause_timeout (
void *data);
661 static void timer_function_heartbeat_timeout (
void *data);
662 static void timer_function_token_retransmit_timeout (
void *data);
663 static void timer_function_token_hold_retransmit_timeout (
void *data);
664 static void timer_function_merge_detect_timeout (
void *data);
666 static void totemsrp_buffer_release (
struct totemsrp_instance *instance,
void *ptr);
672 unsigned int msg_len);
677 unsigned int iface_no);
682 message_handler_orf_token,
683 message_handler_mcast,
684 message_handler_memb_merge_detect,
685 message_handler_memb_join,
686 message_handler_memb_commit_token,
687 message_handler_token_hold_cancel
691 #define log_printf(level, format, args...) \ 693 instance->totemsrp_log_printf ( \ 694 level, instance->totemsrp_subsys_id, \ 695 __FUNCTION__, __FILE__, __LINE__, \ 698 #define LOGSYS_PERROR(err_num, level, fmt, args...) \ 700 char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \ 701 const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \ 702 instance->totemsrp_log_printf ( \ 703 level, instance->totemsrp_subsys_id, \ 704 __FUNCTION__, __FILE__, __LINE__, \ 705 fmt ": %s (%d)\n", ##args, _error_ptr, err_num); \ 711 return gather_state_from_desc[gsfrom];
751 static void main_token_seqid_get (
754 unsigned int *token_is)
766 static unsigned int main_msgs_missing (
void)
775 uint64_t timestamp_msec;
778 now_msec = (qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC);
783 "Process pause detected for %d ms, flushing membership messages.", (
unsigned int)(now_msec - timestamp_msec));
798 unsigned long long nano_secs = qb_util_nano_current_get ();
800 time_now = (nano_secs / QB_TIME_NS_IN_MSEC);
833 qb_loop_t *poll_handle,
841 unsigned int msg_len,
842 int endian_conversion_required),
846 const unsigned int *member_list,
size_t member_list_entries,
847 const unsigned int *left_list,
size_t left_list_entries,
848 const unsigned int *joined_list,
size_t joined_list_entries,
850 void (*waiting_trans_ack_cb_fn) (
856 if (instance == NULL) {
860 totemsrp_instance_initialize (instance);
898 "Token Timeout (%d ms) retransmit timeout (%d ms)",
901 "token hold (%d ms) retransmits before loss (%d retrans)",
904 "join (%d ms) send_join (%d ms) consensus (%d ms) merge (%d ms)",
911 "downcheck (%d ms) fail to recv const (%d msgs)",
917 "window size per rotation (%d messages) maximum messages per rotation (%d messages)",
921 "missed count const (%d messages)",
925 "send threads (%d threads)", totem_config->
threads);
927 "RRP token expired timeout (%d ms)",
930 "RRP token problem counter (%d ms)",
933 "RRP threshold (%d problem count)",
936 "RRP multicast threshold (%d problem count)",
939 "RRP automatic recovery check timeout (%d ms)",
966 timer_function_pause_timeout (instance);
970 "HeartBeat is Disabled. To enable set heartbeat_failures_allowed > 0");
981 "total heartbeat_timeout (%d ms) is not less than token timeout (%d ms)",
985 "heartbeat_timeout = heartbeat_failures_allowed * token_retransmit_timeout + max_network_delay");
987 "heartbeat timeout should be less than the token timeout. HeartBeat is Diabled !!");
1003 main_iface_change_fn,
1004 main_token_seqid_get,
1006 target_set_completed);
1023 token_event_stats_collector,
1029 token_event_stats_collector,
1031 *srp_context = instance;
1044 memb_leave_message_send (instance);
1064 unsigned int nodeid,
1066 unsigned int interfaces_size,
1068 unsigned int *iface_count)
1072 unsigned int found = 0;
1085 if (interfaces_size >= *iface_count) {
1105 if (interfaces_size >= *iface_count) {
1122 const char *cipher_type,
1123 const char *hash_type)
1172 static int srp_addr_equal (
const struct srp_addr *a,
const struct srp_addr *b)
1177 for (i = 0; i < 1; i++) {
1186 static void srp_addr_copy (
struct srp_addr *dest,
const struct srp_addr *src)
1197 static void srp_addr_to_nodeid (
1198 unsigned int *nodeid_out,
1200 unsigned int entries)
1204 for (i = 0; i < entries; i++) {
1205 nodeid_out[i] = srp_addr_in[i].
addr[0].
nodeid;
1209 static void srp_addr_copy_endian_convert (
struct srp_addr *out,
const struct srp_addr *in)
1223 static void memb_set_subtract (
1224 struct srp_addr *out_list,
int *out_list_entries,
1225 struct srp_addr *one_list,
int one_list_entries,
1226 struct srp_addr *two_list,
int two_list_entries)
1232 *out_list_entries = 0;
1234 for (i = 0; i < one_list_entries; i++) {
1235 for (j = 0; j < two_list_entries; j++) {
1236 if (srp_addr_equal (&one_list[i], &two_list[j])) {
1242 srp_addr_copy (&out_list[*out_list_entries], &one_list[i]);
1243 *out_list_entries = *out_list_entries + 1;
1252 static void memb_consensus_set (
1279 static int memb_consensus_isset (
1296 static int memb_consensus_agreed (
1299 struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
1300 int token_memb_entries = 0;
1304 memb_set_subtract (token_memb, &token_memb_entries,
1308 for (i = 0; i < token_memb_entries; i++) {
1309 if (memb_consensus_isset (instance, &token_memb[i]) == 0) {
1324 assert (token_memb_entries >= 1);
1329 static void memb_consensus_notset (
1331 struct srp_addr *no_consensus_list,
1332 int *no_consensus_list_entries,
1334 int comparison_list_entries)
1338 *no_consensus_list_entries = 0;
1341 if (memb_consensus_isset (instance, &instance->
my_proc_list[i]) == 0) {
1342 srp_addr_copy (&no_consensus_list[*no_consensus_list_entries], &instance->
my_proc_list[i]);
1343 *no_consensus_list_entries = *no_consensus_list_entries + 1;
1351 static int memb_set_equal (
1352 struct srp_addr *set1,
int set1_entries,
1353 struct srp_addr *set2,
int set2_entries)
1360 if (set1_entries != set2_entries) {
1363 for (i = 0; i < set2_entries; i++) {
1364 for (j = 0; j < set1_entries; j++) {
1365 if (srp_addr_equal (&set1[j], &set2[i])) {
1381 static int memb_set_subset (
1382 const struct srp_addr *subset,
int subset_entries,
1383 const struct srp_addr *fullset,
int fullset_entries)
1389 if (subset_entries > fullset_entries) {
1392 for (i = 0; i < subset_entries; i++) {
1393 for (j = 0; j < fullset_entries; j++) {
1394 if (srp_addr_equal (&subset[i], &fullset[j])) {
1408 static void memb_set_merge (
1409 const struct srp_addr *subset,
int subset_entries,
1410 struct srp_addr *fullset,
int *fullset_entries)
1416 for (i = 0; i < subset_entries; i++) {
1417 for (j = 0; j < *fullset_entries; j++) {
1418 if (srp_addr_equal (&fullset[j], &subset[i])) {
1424 srp_addr_copy (&fullset[*fullset_entries], &subset[i]);
1425 *fullset_entries = *fullset_entries + 1;
1432 static void memb_set_and_with_ring_id (
1448 for (i = 0; i < set2_entries; i++) {
1449 for (j = 0; j < set1_entries; j++) {
1450 if (srp_addr_equal (&set1[j], &set2[i])) {
1451 if (memcmp (&set1_ring_ids[j], old_ring_id,
sizeof (
struct memb_ring_id)) == 0) {
1458 srp_addr_copy (&and[*and_entries], &set1[j]);
1459 *and_entries = *and_entries + 1;
1466 #ifdef CODE_COVERAGE 1467 static void memb_set_print (
1474 printf (
"List '%s' contains %d entries:\n",
string, list_entries);
1476 for (i = 0; i < list_entries; i++) {
1477 printf (
"Address %d with %d rings\n", i, list[i].
no_addrs);
1478 for (j = 0; j < list[i].
no_addrs; j++) {
1479 printf (
"\tiface %d %s\n", j,
totemip_print (&list[i].addr[j]));
1480 printf (
"\tfamily %d\n", list[i].addr[j].
family);
1485 static void my_leave_memb_clear(
1492 static unsigned int my_leave_memb_match(
1494 unsigned int nodeid)
1497 unsigned int ret = 0;
1508 static void my_leave_memb_set(
1510 unsigned int nodeid)
1527 "Cannot set LEAVE nodeid=%d", nodeid);
1534 assert (instance != NULL);
1538 static void totemsrp_buffer_release (
struct totemsrp_instance *instance,
void *ptr)
1540 assert (instance != NULL);
1552 timer_function_token_retransmit_timeout,
1564 timer_function_merge_detect_timeout,
1590 "Saving state aru %x high seq received %x",
1600 "Restoring instance->my_aru %x my high seq received %x",
1607 "Resetting old ring state");
1618 timer_function_pause_timeout,
1628 timer_function_orf_token_timeout,
1638 timer_function_heartbeat_timeout,
1651 static void cancel_token_retransmit_timeout (
struct totemsrp_instance *instance)
1656 static void start_token_hold_retransmit_timeout (
struct totemsrp_instance *instance)
1662 timer_function_token_hold_retransmit_timeout,
1666 static void cancel_token_hold_retransmit_timeout (
struct totemsrp_instance *instance)
1672 static void memb_state_consensus_timeout_expired (
1675 struct srp_addr no_consensus_list[PROCESSOR_COUNT_MAX];
1676 int no_consensus_list_entries;
1679 if (memb_consensus_agreed (instance)) {
1680 memb_consensus_reset (instance);
1682 memb_consensus_set (instance, &instance->
my_id);
1684 reset_token_timeout (instance);
1686 memb_consensus_notset (
1689 &no_consensus_list_entries,
1693 memb_set_merge (no_consensus_list, no_consensus_list_entries,
1706 static void timer_function_pause_timeout (
void *data)
1711 reset_pause_timeout (instance);
1716 old_ring_state_restore (instance);
1721 static void timer_function_orf_token_timeout (
void *data)
1728 "The token was lost in the OPERATIONAL state.");
1730 "A processor failed, forming new configuration.");
1738 "The consensus timeout expired.");
1739 memb_state_consensus_timeout_expired (instance);
1746 "The token was lost in the COMMIT state.");
1753 "The token was lost in the RECOVERY state.");
1754 memb_recovery_state_token_loss (instance);
1760 static void timer_function_heartbeat_timeout (
void *data)
1764 "HeartBeat Timer expired Invoking token loss mechanism in state %d ", instance->
memb_state);
1765 timer_function_orf_token_timeout(data);
1768 static void memb_timer_function_state_gather (
void *data)
1779 memb_join_message_send (instance);
1790 memb_timer_function_state_gather,
1796 static void memb_timer_function_gather_consensus_timeout (
void *data)
1799 memb_state_consensus_timeout_expired (instance);
1802 static void deliver_messages_from_recovery_to_regular (
struct totemsrp_instance *instance)
1807 unsigned int range = 0;
1820 for (i = 1; i <= range; i++) {
1826 recovery_message_item = ptr;
1831 mcast = recovery_message_item->
mcast;
1837 regular_message_item.
mcast =
1838 (
struct mcast *)(((
char *)recovery_message_item->
mcast) +
sizeof (
struct mcast));
1839 regular_message_item.
msg_len =
1840 recovery_message_item->
msg_len -
sizeof (
struct mcast);
1841 mcast = regular_message_item.
mcast;
1850 "comparing if ring id is for this processors old ring seqno %d",
1864 ®ular_message_item, mcast->
seq);
1871 "-not adding msg with seq no %x", mcast->
seq);
1881 struct srp_addr joined_list[PROCESSOR_COUNT_MAX];
1882 int joined_list_entries = 0;
1883 unsigned int aru_save;
1890 char left_node_msg[1024];
1891 char joined_node_msg[1024];
1892 char failed_node_msg[1024];
1896 memb_consensus_reset (instance);
1898 old_ring_state_reset (instance);
1900 deliver_messages_from_recovery_to_regular (instance);
1903 "Delivering to app %x to %x",
1906 aru_save = instance->
my_aru;
1919 memb_set_subtract (joined_list, &joined_list_entries,
1947 srp_addr_to_nodeid (trans_memb_list_totemip,
1960 instance->
my_aru = aru_save;
1970 joined_list, joined_list_entries,
1975 srp_addr_to_nodeid (new_memb_list_totemip,
1977 srp_addr_to_nodeid (joined_list_totemip, joined_list,
1978 joined_list_entries);
1982 joined_list_totemip, joined_list_entries, &instance->
my_ring_id);
2044 regular_message = ptr;
2045 free (regular_message->
mcast);
2051 if (joined_list_entries) {
2053 sptr += snprintf(joined_node_msg,
sizeof(joined_node_msg)-sptr,
" joined:");
2054 for (i=0; i< joined_list_entries; i++) {
2055 sptr += snprintf(joined_node_msg+sptr,
sizeof(joined_node_msg)-sptr,
" %u", joined_list_totemip[i]);
2059 joined_node_msg[0] =
'\0';
2065 sptr += snprintf(left_node_msg,
sizeof(left_node_msg)-sptr,
" left:");
2067 sptr += snprintf(left_node_msg+sptr,
sizeof(left_node_msg)-sptr,
" %u", left_list[i]);
2070 if (my_leave_memb_match(instance, left_list[i]) == 0) {
2072 sptr2 += snprintf(failed_node_msg,
sizeof(failed_node_msg)-sptr2,
" failed:");
2074 sptr2 += snprintf(failed_node_msg+sptr2,
sizeof(left_node_msg)-sptr2,
" %u", left_list[i]);
2078 failed_node_msg[0] =
'\0';
2082 left_node_msg[0] =
'\0';
2083 failed_node_msg[0] =
'\0';
2086 my_leave_memb_clear(instance);
2089 "entering OPERATIONAL state.");
2091 "A new membership (%s:%lld) was formed. Members%s%s",
2097 if (strlen(failed_node_msg)) {
2099 "Failed to receive the leave message.%s",
2110 reset_pause_timeout (instance);
2123 static void memb_state_gather_enter (
2132 &instance->
my_id, 1,
2135 memb_join_message_send (instance);
2146 memb_timer_function_state_gather,
2159 memb_timer_function_gather_consensus_timeout,
2165 cancel_token_retransmit_timeout (instance);
2166 cancel_token_timeout (instance);
2167 cancel_merge_detect_timeout (instance);
2169 memb_consensus_reset (instance);
2171 memb_consensus_set (instance, &instance->
my_id);
2174 "entering GATHER state from %d(%s).",
2175 gather_from, gsfrom_to_msg(gather_from));
2190 static void timer_function_token_retransmit_timeout (
void *data);
2192 static void target_set_completed (
2197 memb_state_commit_token_send (instance);
2201 static void memb_state_commit_enter (
2204 old_ring_state_save (instance);
2206 memb_state_commit_token_update (instance);
2208 memb_state_commit_token_target_set (instance);
2224 "entering COMMIT state.");
2227 reset_token_retransmit_timeout (instance);
2228 reset_token_timeout (instance);
2244 static void memb_state_recovery_enter (
2249 int local_received_flg = 1;
2250 unsigned int low_ring_aru;
2251 unsigned int range = 0;
2252 unsigned int messages_originated = 0;
2255 struct memb_ring_id my_new_memb_ring_id_list[PROCESSOR_COUNT_MAX];
2261 "entering RECOVERY state.");
2272 memb_state_commit_token_send_recovery (instance, commit_token);
2287 memcpy (&my_new_memb_ring_id_list[i],
2288 &memb_list[i].ring_id,
2291 memb_set_and_with_ring_id (
2293 my_new_memb_ring_id_list,
2307 "position [%d] member %s:", i,
totemip_print (&addr[i].addr[0]));
2309 "previous ring seq %llx rep %s",
2314 "aru %x high delivered %x received flag %d",
2332 local_received_flg = 0;
2336 if (local_received_flg == 1) {
2352 if (sq_lt_compare (memb_list[i].
aru, low_ring_aru)) {
2354 low_ring_aru = memb_list[i].
aru;
2375 "copying all old ring messages from %x-%x.",
2378 for (i = 1; i <= range; i++) {
2385 low_ring_aru + i, &ptr);
2389 sort_queue_item = ptr;
2390 messages_originated++;
2391 memset (&message_item, 0,
sizeof (
struct message_item));
2393 message_item.
mcast = totemsrp_buffer_alloc (instance);
2394 assert (message_item.
mcast);
2404 memcpy (((
char *)message_item.
mcast) + sizeof (
struct mcast),
2405 sort_queue_item->
mcast,
2410 "Originated %d messages in RECOVERY.", messages_originated);
2415 "Did not need to originate any messages in recovery.");
2425 reset_token_timeout (instance);
2426 reset_token_retransmit_timeout (instance);
2439 token_hold_cancel_send (instance);
2446 struct iovec *iovec,
2447 unsigned int iov_len,
2454 unsigned int addr_idx;
2463 if (cs_queue_is_full (queue_use)) {
2468 memset (&message_item, 0,
sizeof (
struct message_item));
2473 message_item.
mcast = totemsrp_buffer_alloc (instance);
2474 if (message_item.
mcast == 0) {
2481 memset(message_item.
mcast, 0, sizeof (
struct mcast));
2491 addr = (
char *)message_item.
mcast;
2492 addr_idx = sizeof (
struct mcast);
2493 for (i = 0; i < iov_len; i++) {
2494 memcpy (&addr[addr_idx], iovec[i].iov_base, iovec[i].iov_len);
2495 addr_idx += iovec[i].iov_len;
2498 message_item.
msg_len = addr_idx;
2502 cs_queue_item_add (queue_use, &message_item);
2524 cs_queue_avail (queue_use, &avail);
2535 static int orf_token_remcast (
2543 struct sq *sort_queue;
2551 res = sq_in_range (sort_queue, seq);
2560 res = sq_item_get (sort_queue, seq, &ptr);
2565 sort_queue_item = ptr;
2569 sort_queue_item->
mcast,
2579 static void messages_free (
2581 unsigned int token_aru)
2586 int log_release = 0;
2587 unsigned int release_to;
2588 unsigned int range = 0;
2590 release_to = token_aru;
2591 if (sq_lt_compare (instance->
my_last_aru, release_to)) {
2611 for (i = 1; i <= range; i++) {
2617 regular_message = ptr;
2618 totemsrp_buffer_release (instance, regular_message->
mcast);
2629 "releasing messages up to and including %x", release_to);
2633 static void update_aru (
2638 struct sq *sort_queue;
2640 unsigned int my_aru_saved = 0;
2650 my_aru_saved = instance->
my_aru;
2651 for (i = 1; i <= range; i++) {
2655 res = sq_item_get (sort_queue, my_aru_saved + i, &ptr);
2663 instance->
my_aru += i - 1;
2669 static int orf_token_mcast (
2672 int fcc_mcasts_allowed)
2676 struct sq *sort_queue;
2679 unsigned int fcc_mcast_current;
2684 reset_token_retransmit_timeout (instance);
2695 for (fcc_mcast_current = 0; fcc_mcast_current < fcc_mcasts_allowed; fcc_mcast_current++) {
2696 if (cs_queue_is_empty (mcast_queue)) {
2699 message_item = (
struct message_item *)cs_queue_item_get (mcast_queue);
2707 memset (&sort_queue_item, 0,
sizeof (
struct sort_queue_item));
2711 mcast = sort_queue_item.
mcast;
2718 sq_item_add (sort_queue, &sort_queue_item, message_item->
mcast->
seq);
2722 message_item->
mcast,
2728 cs_queue_item_remove (mcast_queue);
2736 update_aru (instance);
2741 return (fcc_mcast_current);
2748 static int orf_token_rtr (
2751 unsigned int *fcc_allowed)
2756 struct sq *sort_queue;
2758 unsigned int range = 0;
2759 char retransmit_msg[1024];
2768 rtr_list = &orf_token->
rtr_list[0];
2770 strcpy (retransmit_msg,
"Retransmit List: ");
2775 sprintf (value,
"%x ", rtr_list[i].seq);
2776 strcat (retransmit_msg, value);
2778 strcat (retransmit_msg,
"");
2780 "%s", retransmit_msg);
2793 if (memcmp (&rtr_list[i].ring_id, &instance->
my_ring_id,
2800 res = orf_token_remcast (instance, rtr_list[i].seq);
2807 memmove (&rtr_list[i], &rtr_list[i + 1],
2823 range = orf_token->
seq - instance->
my_aru;
2827 (i <= range); i++) {
2832 res = sq_in_range (sort_queue, instance->
my_aru + i);
2840 res = sq_item_inuse (sort_queue, instance->
my_aru + i);
2851 res = sq_item_miss_count (sort_queue, instance->
my_aru + i);
2861 if (instance->
my_aru + i == rtr_list[j].
seq) {
2891 static void timer_function_token_retransmit_timeout (
void *data)
2901 token_retransmit (instance);
2902 reset_token_retransmit_timeout (instance);
2907 static void timer_function_token_hold_retransmit_timeout (
void *data)
2918 token_retransmit (instance);
2923 static void timer_function_merge_detect_timeout(
void *data)
2932 memb_merge_detect_transmit (instance);
2945 static int token_send (
2947 struct orf_token *orf_token,
2951 unsigned int orf_token_size;
2953 orf_token_size =
sizeof (
struct orf_token) +
2954 (orf_token->rtr_list_entries *
sizeof (
struct rtr_item));
2961 if (forward_token == 0) {
2998 sizeof (
struct token_hold_cancel));
3005 struct orf_token orf_token;
3037 res = token_send (instance, &orf_token, 1);
3042 static void memb_state_commit_token_update (
3047 unsigned int high_aru;
3083 if (sq_lt_compare (high_aru, memb_list[i].
aru)) {
3084 high_aru = memb_list[i].
aru;
3094 if (sq_lt_compare (memb_list[i].
aru, high_aru)) {
3109 static void memb_state_commit_token_target_set (
3126 static int memb_state_commit_token_send_recovery (
3128 struct memb_commit_token *commit_token)
3130 unsigned int commit_token_size;
3134 commit_token_size =
sizeof (
struct memb_commit_token) +
3135 ((sizeof (struct srp_addr) +
3136 sizeof (struct memb_commit_token_memb_entry)) * commit_token->
addr_entries);
3152 reset_token_retransmit_timeout (instance);
3156 static int memb_state_commit_token_send (
3159 unsigned int commit_token_size;
3163 commit_token_size =
sizeof (
struct memb_commit_token) +
3164 ((sizeof (struct srp_addr) +
3181 reset_token_retransmit_timeout (instance);
3188 struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3189 int token_memb_entries = 0;
3193 memb_set_subtract (token_memb, &token_memb_entries,
3201 lowest_addr = &token_memb[0].
addr[0];
3202 for (i = 1; i < token_memb_entries; i++) {
3210 static int srp_addr_compare (
const void *a,
const void *b)
3218 static void memb_state_commit_token_create (
3221 struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3224 int token_memb_entries = 0;
3227 "Creating commit token because I am the rep.");
3229 memb_set_subtract (token_memb, &token_memb_entries,
3233 memset (instance->
commit_token, 0, sizeof (
struct memb_commit_token));
3248 qsort (token_memb, token_memb_entries,
sizeof (
struct srp_addr),
3257 memcpy (addr, token_memb,
3258 token_memb_entries *
sizeof (
struct srp_addr));
3259 memset (memb_list, 0,
3265 char memb_join_data[40000];
3268 unsigned int addr_idx;
3277 msg_len =
sizeof(
struct memb_join) +
3278 ((instance->my_proc_list_entries + instance->my_failed_list_entries) *
sizeof(
struct srp_addr));
3280 if (msg_len >
sizeof(memb_join_data)) {
3281 log_printf (instance->totemsrp_log_level_error,
3282 "memb_join_message too long. Ignoring message.");
3287 memb_join->
ring_seq = instance->my_ring_id.seq;
3290 srp_addr_copy (&memb_join->
system_from, &instance->my_id);
3296 addr = (
char *)memb_join;
3297 addr_idx =
sizeof (
struct memb_join);
3298 memcpy (&addr[addr_idx],
3299 instance->my_proc_list,
3300 instance->my_proc_list_entries *
3303 instance->my_proc_list_entries *
3305 memcpy (&addr[addr_idx],
3306 instance->my_failed_list,
3307 instance->my_failed_list_entries *
3310 instance->my_failed_list_entries *
3313 if (instance->totem_config->send_join_timeout) {
3314 usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3317 instance->stats.memb_join_tx++;
3320 instance->totemrrp_context,
3327 char memb_join_data[40000];
3330 unsigned int addr_idx;
3331 int active_memb_entries;
3332 struct srp_addr active_memb[PROCESSOR_COUNT_MAX];
3336 "sending join/leave message");
3343 &instance->
my_id, 1,
3346 memb_set_subtract (active_memb, &active_memb_entries,
3348 &instance->
my_id, 1);
3350 msg_len =
sizeof(
struct memb_join) +
3351 ((active_memb_entries + instance->my_failed_list_entries) *
sizeof(
struct srp_addr));
3353 if (msg_len >
sizeof(memb_join_data)) {
3354 log_printf (instance->totemsrp_log_level_error,
3355 "memb_leave message too long. Ignoring message.");
3365 memb_join->
ring_seq = instance->my_ring_id.seq;
3368 srp_addr_copy (&memb_join->
system_from, &instance->my_id);
3376 addr = (
char *)memb_join;
3377 addr_idx =
sizeof (
struct memb_join);
3378 memcpy (&addr[addr_idx],
3380 active_memb_entries *
3383 active_memb_entries *
3385 memcpy (&addr[addr_idx],
3386 instance->my_failed_list,
3387 instance->my_failed_list_entries *
3390 instance->my_failed_list_entries *
3394 if (instance->totem_config->send_join_timeout) {
3395 usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3397 instance->stats.memb_join_tx++;
3400 instance->totemrrp_context,
3421 sizeof (
struct memb_merge_detect));
3424 static void memb_ring_id_set (
3443 token_hold_cancel_send (instance);
3446 if (callback_handle == 0) {
3449 *handle_out = (
void *)callback_handle;
3450 list_init (&callback_handle->
list);
3452 callback_handle->
data = (
void *) data;
3454 callback_handle->
delete =
delete;
3473 list_del (&h->
list);
3480 static void token_callbacks_execute (
3486 struct list_head *callback_listhead = 0;
3502 for (list = callback_listhead->
next; list != callback_listhead;
3505 token_callback_instance =
list_entry (list,
struct token_callback_instance, list);
3507 list_next = list->
next;
3508 del = token_callback_instance->
delete;
3515 token_callback_instance->
data);
3519 if (res == -1 && del == 1) {
3520 list_add (list, callback_listhead);
3522 free (token_callback_instance);
3546 if (queue_use != NULL) {
3547 backlog = cs_queue_used (queue_use);
3554 static int fcc_calculate (
3556 struct orf_token *token)
3558 unsigned int transmits_allowed;
3559 unsigned int backlog_calc;
3567 instance->
my_cbl = backlog_get (instance);
3576 if (backlog_calc > 0 && transmits_allowed > backlog_calc) {
3577 transmits_allowed = backlog_calc;
3581 return (transmits_allowed);
3587 static void fcc_rtr_limit (
3589 struct orf_token *token,
3590 unsigned int *transmits_allowed)
3594 assert (check >= 0);
3601 *transmits_allowed = 0;
3605 static void fcc_token_update (
3607 struct orf_token *token,
3608 unsigned int msgs_transmitted)
3610 token->
fcc += msgs_transmitted - instance->
my_trc;
3612 instance->
my_trc = msgs_transmitted;
3619 static int check_totemip_sanity(
3622 int endian_conversion_needed)
3627 if (endian_conversion_needed) {
3631 if (family != AF_INET && family != AF_INET6) {
3633 "Received message corrupted... ignoring.");
3641 static int check_srpaddr_sanity(
3644 int endian_conversion_needed)
3652 for (i = 0; i < addr->
no_addrs; i++) {
3654 if (check_totemip_sanity(instance, &addr->
addr[i], endian_conversion_needed) == -1) {
3663 static int check_orf_token_sanity(
3667 int endian_conversion_needed)
3670 const struct orf_token *token = (
const struct orf_token *)msg;
3671 size_t required_len;
3674 if (msg_len <
sizeof(
struct orf_token)) {
3676 "Received orf_token message is too short... ignoring.");
3681 if (check_totemip_sanity(instance, &token->
ring_id.
rep, endian_conversion_needed) == -1) {
3685 if (endian_conversion_needed) {
3691 required_len =
sizeof(
struct orf_token) + rtr_entries *
sizeof(
struct rtr_item);
3692 if (msg_len < required_len) {
3694 "Received orf_token message is too short... ignoring.");
3699 for (i = 0; i < rtr_entries; i++) {
3701 endian_conversion_needed) == -1) {
3709 static int check_mcast_sanity(
3713 int endian_conversion_needed)
3715 const struct mcast *mcast_msg = (
const struct mcast *)msg;
3717 if (msg_len <
sizeof(
struct mcast)) {
3719 "Received mcast message is too short... ignoring.");
3724 if ((check_totemip_sanity(instance, &mcast_msg->
ring_id.
rep, endian_conversion_needed) == -1) ||
3725 (check_srpaddr_sanity(instance, &mcast_msg->
system_from, endian_conversion_needed) == -1)) {
3732 static int check_memb_merge_detect_sanity(
3736 int endian_conversion_needed)
3742 "Received memb_merge_detect message is too short... ignoring.");
3747 if ((check_totemip_sanity(instance, &mmd_msg->
ring_id.
rep, endian_conversion_needed) == -1) ||
3748 (check_srpaddr_sanity(instance, &mmd_msg->
system_from, endian_conversion_needed) == -1)) {
3755 static int check_memb_join_sanity(
3759 int endian_conversion_needed)
3764 size_t required_len;
3766 const struct srp_addr *failed_list;
3769 if (msg_len <
sizeof(
struct memb_join)) {
3771 "Received memb_join message is too short... ignoring.");
3776 if (check_srpaddr_sanity(instance, &mj_msg->
system_from, endian_conversion_needed) == -1) {
3783 if (endian_conversion_needed) {
3784 proc_list_entries =
swab32(proc_list_entries);
3785 failed_list_entries =
swab32(failed_list_entries);
3788 required_len =
sizeof(
struct memb_join) + ((proc_list_entries + failed_list_entries) *
sizeof(
struct srp_addr));
3789 if (msg_len < required_len) {
3791 "Received memb_join message is too short... ignoring.");
3797 failed_list = proc_list + proc_list_entries;
3800 if (check_srpaddr_sanity(instance, &proc_list[i], endian_conversion_needed) == -1) {
3806 if (check_srpaddr_sanity(instance, &failed_list[i], endian_conversion_needed) == -1) {
3814 static int check_memb_commit_token_sanity(
3818 int endian_conversion_needed)
3820 const struct memb_commit_token *mct_msg = (
const struct memb_commit_token *)msg;
3824 size_t required_len;
3827 if (msg_len <
sizeof(
struct memb_commit_token)) {
3829 "Received memb_commit_token message is too short... ignoring.");
3834 if (check_totemip_sanity(instance, &mct_msg->
ring_id.
rep, endian_conversion_needed) == -1) {
3839 if (endian_conversion_needed) {
3840 addr_entries =
swab32(addr_entries);
3843 required_len =
sizeof(
struct memb_commit_token) +
3844 (addr_entries * (
sizeof(
struct srp_addr) + sizeof(struct memb_commit_token_memb_entry)));
3845 if (msg_len < required_len) {
3847 "Received memb_commit_token message is too short... ignoring.");
3853 memb_list = (
const struct memb_commit_token_memb_entry *)(addr +
addr_entries);
3856 if (check_srpaddr_sanity(instance, &addr[i], endian_conversion_needed) == -1) {
3860 if (memb_list[i].ring_id.
rep.
family != 0) {
3861 if (check_totemip_sanity(instance, &memb_list[i].ring_id.
rep,
3862 endian_conversion_needed) == -1) {
3871 static int check_token_hold_cancel_sanity(
3875 int endian_conversion_needed)
3881 "Received token_hold_cancel message is too short... ignoring.");
3886 if (check_totemip_sanity(instance, &thc_msg->
ring_id.
rep, endian_conversion_needed) == -1) {
3901 static int message_handler_orf_token (
3905 int endian_conversion_needed)
3907 char token_storage[1500];
3908 char token_convert[1500];
3909 struct orf_token *token = NULL;
3911 unsigned int transmits_allowed;
3912 unsigned int mcasted_retransmit;
3913 unsigned int mcasted_regular;
3914 unsigned int last_aru;
3917 unsigned long long tv_current;
3918 unsigned long long tv_diff;
3920 tv_current = qb_util_nano_current_get ();
3921 tv_diff = tv_current -
tv_old;
3922 tv_old = tv_current;
3925 "Time since last token %0.4f ms", ((
float)tv_diff) / 1000000.0);
3928 if (check_orf_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
3935 #ifdef TEST_DROP_ORF_TOKEN_PERCENTAGE 3936 if (random()%100 < TEST_DROP_ORF_TOKEN_PERCENTAGE) {
3941 if (endian_conversion_needed) {
3942 orf_token_endian_convert ((
struct orf_token *)msg,
3943 (
struct orf_token *)token_convert);
3944 msg = (
struct orf_token *)token_convert;
3951 token = (
struct orf_token *)token_storage;
3952 memcpy (token, msg,
sizeof (
struct orf_token));
3953 memcpy (&token->
rtr_list[0], (
char *)msg + sizeof (
struct orf_token),
3961 start_merge_detect_timeout (instance);
3964 cancel_merge_detect_timeout (instance);
3965 cancel_token_hold_retransmit_timeout (instance);
3971 #ifdef TEST_RECOVERY_MSG_COUNT 4012 messages_free (instance, token->
aru);
4031 reset_heartbeat_timeout(instance);
4034 cancel_heartbeat_timeout(instance);
4049 transmits_allowed = fcc_calculate (instance, token);
4050 mcasted_retransmit = orf_token_rtr (instance, token, &transmits_allowed);
4058 fcc_rtr_limit (instance, token, &transmits_allowed);
4059 mcasted_regular = orf_token_mcast (instance, token, transmits_allowed);
4066 fcc_token_update (instance, token, mcasted_retransmit +
4069 if (sq_lt_compare (instance->
my_aru, token->
aru) ||
4074 if (token->
aru == token->
seq) {
4080 if (token->
aru == last_aru && token->
aru_addr != 0) {
4095 "FAILED TO RECEIVE");
4099 memb_set_merge (&instance->
my_id, 1,
4126 "token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x",
4139 "install seq %x aru %x high seq received %x",
4157 "retrans flag count %x token aru %x install seq %x aru %x %x",
4161 memb_state_operational_enter (instance);
4168 token_send (instance, token, forward_token);
4171 tv_current = qb_util_nano_current_get ();
4172 tv_diff = tv_current -
tv_old;
4173 tv_old = tv_current;
4176 ((
float)tv_diff) / 1000000.0);
4179 messages_deliver_to_app (instance, 0,
4187 reset_token_timeout (instance);
4188 reset_token_retransmit_timeout (instance);
4192 start_token_hold_retransmit_timeout (instance);
4202 reset_heartbeat_timeout(instance);
4205 cancel_heartbeat_timeout(instance);
4211 static void messages_deliver_to_app (
4214 unsigned int end_point)
4219 struct mcast *mcast_in;
4220 struct mcast mcast_header;
4221 unsigned int range = 0;
4222 int endian_conversion_required;
4223 unsigned int my_high_delivered_stored = 0;
4239 for (i = 1; i <= range; i++) {
4247 my_high_delivered_stored + i);
4253 my_high_delivered_stored + i, &ptr);
4257 if (res != 0 && skip == 0) {
4268 sort_queue_item_p = ptr;
4270 mcast_in = sort_queue_item_p->
mcast;
4271 assert (mcast_in != (
struct mcast *)0xdeadbeef);
4273 endian_conversion_required = 0;
4275 endian_conversion_required = 1;
4276 mcast_endian_convert (mcast_in, &mcast_header);
4278 memcpy (&mcast_header, mcast_in,
sizeof (
struct mcast));
4299 "Delivering MCAST message with seq %x to pending delivery queue",
4307 ((
char *)sort_queue_item_p->
mcast) + sizeof (
struct mcast),
4309 endian_conversion_required);
4316 static int message_handler_mcast (
4320 int endian_conversion_needed)
4323 struct sq *sort_queue;
4324 struct mcast mcast_header;
4326 if (check_mcast_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4330 if (endian_conversion_needed) {
4331 mcast_endian_convert (msg, &mcast_header);
4333 memcpy (&mcast_header, msg,
sizeof (
struct mcast));
4344 #ifdef TEST_DROP_MCAST_PERCENTAGE 4345 if (random()%100 < TEST_DROP_MCAST_PERCENTAGE) {
4365 if (!memb_set_subset (
4392 "Received ringid(%s:%lld) seq %x",
4402 sq_in_range (sort_queue, mcast_header.
seq) &&
4403 sq_item_inuse (sort_queue, mcast_header.
seq) == 0) {
4409 sort_queue_item.
mcast = totemsrp_buffer_alloc (instance);
4410 if (sort_queue_item.
mcast == NULL) {
4413 memcpy (sort_queue_item.
mcast, msg, msg_len);
4414 sort_queue_item.
msg_len = msg_len;
4417 mcast_header.
seq)) {
4421 sq_item_add (sort_queue, &sort_queue_item, mcast_header.
seq);
4424 update_aru (instance);
4433 static int message_handler_memb_merge_detect (
4437 int endian_conversion_needed)
4441 if (check_memb_merge_detect_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4445 if (endian_conversion_needed) {
4446 memb_merge_detect_endian_convert (msg, &memb_merge_detect);
4448 memcpy (&memb_merge_detect, msg,
4449 sizeof (
struct memb_merge_detect));
4466 memb_set_merge (&memb_merge_detect.
system_from, 1,
4472 if (!memb_set_subset (
4478 memb_set_merge (&memb_merge_detect.
system_from, 1,
4496 static void memb_join_process (
4502 int gather_entered = 0;
4503 int fail_minus_memb_entries = 0;
4504 struct srp_addr fail_minus_memb[PROCESSOR_COUNT_MAX];
4520 "Discarding LEAVE message during flush, nodeid=%u",
4527 "Discarding JOIN message during flush, nodeid=%d", memb_join->
header.
nodeid);
4542 if (memb_set_equal (proc_list,
4547 memb_set_equal (failed_list,
4552 memb_consensus_set (instance, &memb_join->
system_from);
4554 if (memb_consensus_agreed (instance) && instance->
failed_to_recv == 1) {
4561 memb_state_commit_token_create (instance);
4563 memb_state_commit_enter (instance);
4566 if (memb_consensus_agreed (instance) &&
4567 memb_lowest_in_config (instance)) {
4569 memb_state_commit_token_create (instance);
4571 memb_state_commit_enter (instance);
4576 if (memb_set_subset (proc_list,
4581 memb_set_subset (failed_list,
4593 memb_set_merge (proc_list,
4597 if (memb_set_subset (
4598 &instance->
my_id, 1,
4605 if (memb_set_subset (
4610 if (memb_set_subset (
4615 memb_set_merge (failed_list,
4619 memb_set_subtract (fail_minus_memb,
4620 &fail_minus_memb_entries,
4626 memb_set_merge (fail_minus_memb,
4627 fail_minus_memb_entries,
4638 if (gather_entered == 0 &&
4645 static void memb_join_endian_convert (
const struct memb_join *in,
struct memb_join *out)
4667 srp_addr_copy_endian_convert (&out_proc_list[i], &in_proc_list[i]);
4670 srp_addr_copy_endian_convert (&out_failed_list[i], &in_failed_list[i]);
4674 static void memb_commit_token_endian_convert (
const struct memb_commit_token *in,
struct memb_commit_token *out)
4695 srp_addr_copy_endian_convert (&out_addr[i], &in_addr[i]);
4700 if (in_memb_list[i].ring_id.
rep.
family != 0) {
4713 static void orf_token_endian_convert (
const struct orf_token *in,
struct orf_token *out)
4737 static void mcast_endian_convert (
const struct mcast *in,
struct mcast *out)
4753 static void memb_merge_detect_endian_convert (
4765 static int ignore_join_under_operational (
4767 const struct memb_join *memb_join)
4777 if (memb_set_subset (&instance->
my_id, 1,
4795 static int message_handler_memb_join (
4799 int endian_conversion_needed)
4801 const struct memb_join *memb_join;
4802 struct memb_join *memb_join_convert = alloca (msg_len);
4804 if (check_memb_join_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4808 if (endian_conversion_needed) {
4809 memb_join = memb_join_convert;
4810 memb_join_endian_convert (msg, memb_join_convert);
4820 if (pause_flush (instance)) {
4829 if (!ignore_join_under_operational (instance, memb_join)) {
4830 memb_join_process (instance, memb_join);
4835 memb_join_process (instance, memb_join);
4846 memb_join_process (instance, memb_join);
4859 memb_join_process (instance, memb_join);
4860 memb_recovery_state_token_loss (instance);
4868 static int message_handler_memb_commit_token (
4872 int endian_conversion_needed)
4874 struct memb_commit_token *memb_commit_token_convert = alloca (msg_len);
4875 struct memb_commit_token *memb_commit_token;
4876 struct srp_addr sub[PROCESSOR_COUNT_MAX];
4882 "got commit token");
4884 if (check_memb_commit_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4888 if (endian_conversion_needed) {
4889 memb_commit_token_endian_convert (msg, memb_commit_token_convert);
4891 memcpy (memb_commit_token_convert, msg, msg_len);
4893 memb_commit_token = memb_commit_token_convert;
4896 #ifdef TEST_DROP_COMMIT_TOKEN_PERCENTAGE
4897 if (random()%100 < TEST_DROP_COMMIT_TOKEN_PERCENTAGE) {
4907 memb_set_subtract (sub, &sub_entries,
4911 if (memb_set_equal (addr,
4917 memcpy (instance->
commit_token, memb_commit_token, msg_len);
4918 memb_state_commit_enter (instance);
4932 memb_state_recovery_enter (instance, memb_commit_token);
4947 "Sending initial ORF token");
4950 orf_token_send_initial (instance);
4951 reset_token_timeout (instance);
4952 reset_token_retransmit_timeout (instance);
4959 static int message_handler_token_hold_cancel (
4963 int endian_conversion_needed)
4967 if (check_token_hold_cancel_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4976 timer_function_token_retransmit_timeout (instance);
4985 unsigned int msg_len)
4990 if (msg_len <
sizeof (
struct message_header)) {
4992 "Received message is too short... ignoring %u.",
4993 (
unsigned int)msg_len);
4998 switch (message_header->
type) {
5019 printf (
"wrong message type\n");
5036 unsigned int iface_no)
5052 "Created or loaded sequence id %llx.%s for this ring.",
5079 void (*totem_service_ready) (
void))
void(* totemsrp_service_ready_fn)(void)
void(* totemsrp_deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required)
void(*) enum memb_stat memb_state)
int totemrrp_iface_check(void *rrp_context)
void main_iface_change_fn(void *context, const struct totem_ip_address *iface_address, unsigned int iface_no)
void totemip_copy_endian_convert(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
struct srp_addr system_from
struct memb_ring_id ring_id
uint32_t waiting_trans_ack
struct srp_addr system_from
struct memb_ring_id ring_id
int totemsrp_log_level_debug
struct memb_ring_id my_ring_id
Totem Single Ring Protocol.
uint64_t memb_commit_token_rx
int my_leave_memb_entries
struct message_header header
unsigned int old_ring_state_high_seq_received
unsigned int proc_list_entries
struct totem_interface * interfaces
unsigned int interface_count
int totemsrp_my_family_get(void *srp_context)
void(* totemsrp_confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
totemsrp_token_stats_t token[TOTEM_TOKEN_STATS_MAX]
const char * totemip_print(const struct totem_ip_address *addr)
int totemsrp_log_level_error
#define LEAVE_DUMMY_NODEID
struct memb_ring_id ring_id
qb_loop_timer_handle timer_heartbeat_timeout
unsigned int failed_list_entries
unsigned char end_of_memb_join[0]
unsigned long long int tv_old
#define SEQNO_START_TOKEN
void(* memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, const struct totem_ip_address *addr)
unsigned int token_hold_timeout
struct memb_ring_id ring_id
struct totem_ip_address member_list[PROCESSOR_COUNT_MAX]
int totemip_compare(const void *a, const void *b)
int totemsrp_member_add(void *context, const struct totem_ip_address *member, int ring_no)
void * token_sent_event_handle
struct srp_addr system_from
int totemsrp_log_level_notice
unsigned int proc_list_entries
unsigned int totemsrp_my_nodeid_get(void *srp_context)
char rrp_mode[TOTEM_RRP_MODE_BYTES]
void totemsrp_net_mtu_adjust(struct totem_config *totem_config)
int totemsrp_log_level_warning
int totemsrp_crypto_set(void *srp_context, const char *cipher_type, const char *hash_type)
void totemrrp_membership_changed(void *rrp_context, enum totem_configuration_type configuration_type, const struct srp_addr *member_list, size_t member_list_entries, const struct srp_addr *left_list, size_t left_list_entries, const struct srp_addr *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
struct message_header header
uint64_t memb_merge_detect_rx
int totemsrp_ifaces_get(void *srp_context, unsigned int nodeid, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
struct cs_queue new_message_queue_trans
struct message_header header
unsigned char end_of_commit_token[0]
char commit_token_storage[40000]
unsigned int rrp_problem_count_timeout
struct list_head token_callback_sent_listhead
struct cs_queue new_message_queue
struct srp_addr my_deliver_memb_list[PROCESSOR_COUNT_MAX]
void totemsrp_callback_token_destroy(void *srp_context, void **handle_out)
uint64_t gather_token_lost
int totemsrp_log_level_trace
void totemip_copy(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
int totemrrp_ifaces_get(void *rrp_context, char ***status, unsigned int *iface_count)
struct memb_ring_id my_old_ring_id
void * totemrrp_buffer_alloc(void *rrp_context)
unsigned int downcheck_timeout
struct srp_addr my_new_memb_list[PROCESSOR_COUNT_MAX]
uint64_t memb_commit_token_tx
int my_deliver_memb_entries
unsigned int max_network_delay
unsigned int heartbeat_failures_allowed
#define TOTEM_TOKEN_STATS_MAX
struct message_item __attribute__
unsigned long long token_ring_id_seq
struct totem_ip_address mcast_address
int totemsrp_callback_token_create(void *srp_context, void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
unsigned int send_join_timeout
void totemsrp_service_ready_register(void *context, void(*totem_service_ready)(void))
unsigned int rrp_problem_count_threshold
struct srp_addr my_memb_list[PROCESSOR_COUNT_MAX]
uint64_t operational_entered
void(*) in log_level_security)
unsigned long long ring_seq
int totemsrp_mcast(void *srp_context, struct iovec *iovec, unsigned int iov_len, int guarantee)
Multicast a message.
uint64_t operational_token_lost
unsigned int received_flg
uint64_t consensus_timeouts
Totem Network interface - also does encryption/decryption.
unsigned int my_high_delivered
struct message_handlers totemsrp_message_handlers
qb_loop_timer_handle memb_timer_state_gather_consensus_timeout
uint64_t recovery_token_lost
unsigned char end_of_memb_join[0]
unsigned int token_retransmits_before_loss_const
struct message_header header
int totemrrp_finalize(void *rrp_context)
struct list_head token_callback_received_listhead
int totemrrp_member_remove(void *rrp_context, const struct totem_ip_address *member, int iface_no)
struct rtr_item rtr_list[0]
int totemsrp_ring_reenable(void *srp_context)
struct memb_ring_id ring_id
unsigned int seqno_unchanged_const
uint64_t commit_token_lost
unsigned int miss_count_const
int totemrrp_crypto_set(void *rrp_context, const char *cipher_type, const char *hash_type)
uint64_t token_hold_cancel_rx
unsigned int join_timeout
uint32_t originated_orf_token
int totemrrp_send_flush(void *rrp_context)
struct message_header header
struct totem_ip_address mcast_addr
#define MESSAGE_QUEUE_MAX
int totemrrp_member_add(void *rrp_context, const struct totem_ip_address *member, int iface_no)
unsigned int received_flg
struct totem_ip_address rep
unsigned int last_released
int orf_token_retransmit_size
int totemsrp_avail(void *srp_context)
Return number of available messages that can be queued.
unsigned int rrp_autorecovery_check_timeout
#define RETRANS_MESSAGE_QUEUE_SIZE_MAX
void(* memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, const struct totem_ip_address *addr)
unsigned int fail_to_recv_const
void * token_recv_event_handle
struct totem_ip_address boundto
unsigned int my_high_seq_received
int totemrrp_initialize(qb_loop_t *poll_handle, void **rrp_context, struct totem_config *totem_config, totemsrp_stats_t *stats, void *context, void(*deliver_fn)(void *context, const void *msg, unsigned int msg_len), void(*iface_change_fn)(void *context, const struct totem_ip_address *iface_addr, unsigned int iface_no), void(*token_seqid_get)(const void *msg, unsigned int *seqid, unsigned int *token_is), unsigned int(*msgs_missing)(void), void(*target_set_completed)(void *context))
Create an instance.
qb_loop_t * totemsrp_poll_handle
qb_loop_timer_handle timer_pause_timeout
qb_loop_timer_handle timer_merge_detect_timeout
int my_merge_detect_timeout_outstanding
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
int totemsrp_log_level_security
qb_loop_timer_handle timer_orf_token_retransmit_timeout
struct totem_config * totem_config
int(* callback_fn)(enum totem_callback_token_type type, const void *)
qb_loop_timer_handle timer_orf_token_timeout
uint32_t continuous_gather
void totemsrp_threaded_mode_enable(void *context)
int totemsrp_initialize(qb_loop_t *poll_handle, void **srp_context, struct totem_config *totem_config, totemmrp_stats_t *stats, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id), void(*waiting_trans_ack_cb_fn)(int waiting_trans_ack))
Create a protocol instance.
void totemsrp_event_signal(void *srp_context, enum totem_event_type type, int value)
int totemrrp_recv_flush(void *rrp_context)
uint32_t orf_token_discard
int my_failed_list_entries
struct srp_addr my_left_memb_list[PROCESSOR_COUNT_MAX]
uint64_t token_hold_cancel_tx
void(* totem_memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, const struct totem_ip_address *addr)
unsigned int token_timeout
unsigned int high_delivered
unsigned int consensus_timeout
void main_deliver_fn(void *context, const void *msg, unsigned int msg_len)
#define PROCESSOR_COUNT_MAX
unsigned short endian_detector
void totemrrp_buffer_release(void *rrp_context, void *ptr)
Totem Network interface - also does encryption/decryption.
char orf_token_retransmit[TOKEN_SIZE_MAX]
struct message_header header
struct sq regular_sort_queue
void totemsrp_finalize(void *srp_context)
#define QUEUE_RTR_ITEMS_SIZE_MAX
struct srp_addr my_failed_list[PROCESSOR_COUNT_MAX]
struct cs_queue retrans_message_queue
const char * gather_state_from_desc[]
qb_loop_timer_handle memb_timer_state_gather_join_timeout
int my_trans_memb_entries
void(* totemsrp_waiting_trans_ack_cb_fn)(int waiting_trans_ack)
uint64_t memb_merge_detect_tx
unsigned int high_delivered
struct rtr_item rtr_list[0]
int consensus_list_entries
unsigned int rrp_problem_count_mcast_threshold
int totemrrp_processor_count_set(void *rrp_context, unsigned int processor_count)
int totemrrp_mcast_noflush_send(void *rrp_context, const void *msg, unsigned int msg_len)
unsigned char end_of_commit_token[0]
uint32_t threaded_mode_enabled
enum totem_callback_token_type callback_type
int totemrrp_mcast_recv_empty(void *rrp_context)
#define list_entry(ptr, type, member)
unsigned long long ring_seq
struct totem_logging_configuration totem_logging_configuration
int totemrrp_mcast_flush_send(void *rrp_context, const void *msg, unsigned int msg_len)
struct memb_ring_id ring_id
#define log_printf(level, format, args...)
void totemsrp_trans_ack(void *context)
unsigned int max_messages
uint64_t recovery_entered
qb_loop_timer_handle memb_timer_state_commit_timeout
struct memb_commit_token * commit_token
struct consensus_list_item consensus_list[PROCESSOR_COUNT_MAX]
struct srp_addr system_from
struct srp_addr my_proc_list[PROCESSOR_COUNT_MAX]
int(* handler_functions[6])(struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed)
unsigned int merge_timeout
unsigned int use_heartbeat
struct message_header header
int totemsrp_member_remove(void *context, const struct totem_ip_address *member, int ring_no)
unsigned int token_retransmit_timeout
struct srp_addr my_trans_memb_list[PROCESSOR_COUNT_MAX]
#define RETRANSMIT_ENTRIES_MAX
void(* totemsrp_log_printf)(int level, int sybsys, const char *function, const char *file, int line, const char *format,...) __attribute__((format(printf
int totemip_equal(const struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
unsigned int my_token_seq
struct memb_ring_id ring_id
int totemrrp_ring_reenable(void *rrp_context, unsigned int iface_no)
unsigned int my_leave_memb_list[PROCESSOR_COUNT_MAX]
qb_loop_timer_handle timer_orf_token_hold_retransmit_timeout
struct totem_ip_address addr[INTERFACE_MAX]
unsigned int rrp_token_expired_timeout
struct memb_ring_id ring_id
unsigned int my_install_seq
int totemrrp_token_send(void *rrp_context, const void *msg, unsigned int msg_len)
unsigned int failed_list_entries
void(* totem_memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, const struct totem_ip_address *addr)
struct sq recovery_sort_queue
int totemrrp_token_target_set(void *rrp_context, struct totem_ip_address *addr, unsigned int iface_no)
totem_callback_token_type
unsigned int my_high_ring_delivered