From 326ecfecd9f098cfc7ffd4507d0670d6da20af67 Mon Sep 17 00:00:00 2001 From: Simon Pickartz Date: Mon, 14 Nov 2016 17:05:07 +0100 Subject: [PATCH 1/3] first working version --- lib/pscom/pscom_con.c | 2 ++ lib/pscom/pscom_io.c | 7 +++++ lib/pscom/pscom_migrate.c | 58 +++++++++++++++++++++++++++++++++------ lib/pscom/pscom_migrate.h | 2 +- lib/pscom/pscom_priv.h | 4 +++ lib/pscom/pscom_sock.c | 1 + 6 files changed, 64 insertions(+), 10 deletions(-) diff --git a/lib/pscom/pscom_con.c b/lib/pscom/pscom_con.c index 5d12ffeb..17c96900 100644 --- a/lib/pscom/pscom_con.c +++ b/lib/pscom/pscom_con.c @@ -387,6 +387,8 @@ pscom_con_t *pscom_con_create(pscom_sock_t *sock) INIT_LIST_HEAD(&con->poll_reader.next); INIT_LIST_HEAD(&con->poll_next_send); + INIT_LIST_HEAD(&con->shutdown_requested); + con->con_guard.fd = -1; con->precon = NULL; con->in.req = 0; diff --git a/lib/pscom/pscom_io.c b/lib/pscom/pscom_io.c index 58a530ce..6dfb0de0 100644 --- a/lib/pscom/pscom_io.c +++ b/lib/pscom/pscom_io.c @@ -743,6 +743,7 @@ void pscom_reset_con_to_ondemand(pscom_con_t *con) memcpy(remote_name, connection->remote_con_info.name, 8); /* close the connection */ + DPRINT(1, "####### RESET CON: %s", pscom_con_info_str(&connection->remote_con_info)); con->close(con); list_del_init(&con->next); @@ -876,6 +877,12 @@ pscom_req_t *pscom_get_shutdown_receiver(pscom_con_t *con, pscom_header_net_t *n req->pub.ops.io_done = pscom_shutdown_req_receiver_io_done; } else { assert(nh->msg_type == PSCOM_MSGTYPE_SHUTDOWN_ACK); + DPRINT(1, "####### CONNECTION TYPE: %x", connection->type); + DPRINT(1, "####### CONNECTION NODE: %s", pscom_con_info_str(&connection->remote_con_info)); + + if (pscom.migration_state != PSCOM_MIGRATION_PREPARING) { + DPRINT(1, "##### NOOOOOOOOOO"); + } assert(pscom.migration_state == PSCOM_MIGRATION_PREPARING); req->pub.ops.io_done = pscom_shutdown_ack_receiver_io_done; } diff --git a/lib/pscom/pscom_migrate.c b/lib/pscom/pscom_migrate.c index e42e2423..2d2ee0e5 100644 --- a/lib/pscom/pscom_migrate.c +++ b/lib/pscom/pscom_migrate.c @@ -82,7 +82,7 @@ int pscom_suspend_plugins(void) /* suspend listen FD: */ pscom_suspend_listen(&sock->pub); - /* iterate over all connections */ + /* iterate over all connections and post suspend request */ struct list_head *tmp_con; list_for_each_safe(pos_con, tmp_con, &sock->connections) { pscom_con_t *con = list_entry(pos_con, @@ -107,18 +107,58 @@ int pscom_suspend_plugins(void) if (plugin->properties & PSCOM_PLUGIN_PROP_NOT_MIGRATABLE) { - pscom_con_shutdown(con); - - /* wait for response */ - while ( (con->read_is_suspended == 0) || (con->write_is_suspended == 0) ) { - con->read_start(con); - con->write_start(con); - pscom_test_any(); - } + DPRINT(1, "SUSPENDING %s (type: %s)", pscom_con_info_str(&con->pub.remote_con_info),pscom_con_type_str(con->pub.type)); + pscom_con_shutdown(con); + list_add_tail(&con->shutdown_requested, &sock->shutdown_connections); + } } } + /* wait for all connections to be suspended */ + list_for_each(pos_sock, &pscom.sockets) { + pscom_sock_t *sock = list_entry(pos_sock, pscom_sock_t, next); + + /* iterate over all connections and post suspend request */ + struct list_head *tmp_con; + list_for_each_safe(pos_con, tmp_con, &sock->shutdown_connections) { + pscom_con_t *con = list_entry(pos_con, + pscom_con_t, + shutdown_requested); + + DPRINT(1, "WAITING for %s (type: %s)", pscom_con_info_str(&con->pub.remote_con_info),pscom_con_type_str(con->pub.type)); + while ( (con->read_is_suspended == 0) || (con->write_is_suspended == 0) ) { + con->read_start(con); + con->write_start(con); + pscom_test_any(); + } + + list_del_init(&con->shutdown_requested); + + +// /* determine corresponding plugin */ +// arch = PSCOM_CON_TYPE2ARCH(con->pub.type); +// plugin = pscom_plugin_by_archid(arch); +// +// /* go to next connection if plugin not set */ +// if (plugin == NULL) +// continue; +// +// +// if (plugin->properties & +// PSCOM_PLUGIN_PROP_NOT_MIGRATABLE) { +// DPRINT(1, "Wait for suspending of %s (type: %s)", pscom_con_info_str(&con->pub.remote_con_info),pscom_con_type_str(con->pub.type)); +// while ( (con->read_is_suspended == 0) || (con->write_is_suspended == 0) ) { +// DPRINT(1, "Waiting ..."); +// con->read_start(con); +// con->write_start(con); +// pscom_test_any(); +// } +// } + } + } + + /* * Shutdown non-migratable plugins */ diff --git a/lib/pscom/pscom_migrate.h b/lib/pscom/pscom_migrate.h index 84205883..7ca4216c 100644 --- a/lib/pscom/pscom_migrate.h +++ b/lib/pscom/pscom_migrate.h @@ -24,7 +24,7 @@ #define PSCOM_MOSQUITTO_TOPIC_LENGTH 50 #define PSCOM_MOSQUITTO_REQ_TOPIC "fast/pscom///request" #define PSCOM_MOSQUITTO_RESP_TOPIC "fast/pscom///response" -#define PSCOM_BROKER_HOST "devon" +#define PSCOM_BROKER_HOST "zerberus" #define PSCOM_BROKER_PORT 1883 #define PSCOM_KEEP_ALIVE_INT 60 diff --git a/lib/pscom/pscom_priv.h b/lib/pscom/pscom_priv.h index 87768b48..9ac06e67 100644 --- a/lib/pscom/pscom_priv.h +++ b/lib/pscom/pscom_priv.h @@ -291,6 +291,8 @@ struct PSCOM_con pscom_poll_reader_t poll_reader; struct list_head poll_next_send; // used by pscom.poll_sender + struct list_head shutdown_requested; // List for connections to be shut down + struct con_guard con_guard; // connection guard struct { @@ -346,6 +348,8 @@ struct PSCOM_sock struct list_head groups; // List of pscom_group_t.next struct list_head group_req_unknown; // List of pscom_req_t.next (requests with unknown group id) + struct list_head shutdown_connections; // List of pscom_con_t.shutdown_requested + struct pscom_listener { ufd_info_t ufd_info; // TCP listen for new connections unsigned usercnt; // Count the users of the listening fd. (keep fd open, if > 0) diff --git a/lib/pscom/pscom_sock.c b/lib/pscom/pscom_sock.c index c05dd983..56e6ef62 100644 --- a/lib/pscom/pscom_sock.c +++ b/lib/pscom/pscom_sock.c @@ -154,6 +154,7 @@ pscom_sock_t *pscom_sock_create(unsigned int userdata_size) INIT_LIST_HEAD(&sock->group_req_unknown); INIT_LIST_HEAD(&sock->pendingioq); INIT_LIST_HEAD(&sock->sendq_suspending); + INIT_LIST_HEAD(&sock->shutdown_connections); sock->recv_req_cnt_any = 0; From 49656719f23a71686d035ba80df45a71b8a75810 Mon Sep 17 00:00:00 2001 From: Simon Pickartz Date: Fri, 2 Dec 2016 13:46:46 +0100 Subject: [PATCH 2/3] some cleanups --- lib/pscom/pscom_io.c | 7 --- lib/pscom/pscom_migrate.c | 120 +++++++++++++++++--------------------- lib/pscom/pscom_priv.h | 2 +- 3 files changed, 55 insertions(+), 74 deletions(-) diff --git a/lib/pscom/pscom_io.c b/lib/pscom/pscom_io.c index 6dfb0de0..58a530ce 100644 --- a/lib/pscom/pscom_io.c +++ b/lib/pscom/pscom_io.c @@ -743,7 +743,6 @@ void pscom_reset_con_to_ondemand(pscom_con_t *con) memcpy(remote_name, connection->remote_con_info.name, 8); /* close the connection */ - DPRINT(1, "####### RESET CON: %s", pscom_con_info_str(&connection->remote_con_info)); con->close(con); list_del_init(&con->next); @@ -877,12 +876,6 @@ pscom_req_t *pscom_get_shutdown_receiver(pscom_con_t *con, pscom_header_net_t *n req->pub.ops.io_done = pscom_shutdown_req_receiver_io_done; } else { assert(nh->msg_type == PSCOM_MSGTYPE_SHUTDOWN_ACK); - DPRINT(1, "####### CONNECTION TYPE: %x", connection->type); - DPRINT(1, "####### CONNECTION NODE: %s", pscom_con_info_str(&connection->remote_con_info)); - - if (pscom.migration_state != PSCOM_MIGRATION_PREPARING) { - DPRINT(1, "##### NOOOOOOOOOO"); - } assert(pscom.migration_state == PSCOM_MIGRATION_PREPARING); req->pub.ops.io_done = pscom_shutdown_ack_receiver_io_done; } diff --git a/lib/pscom/pscom_migrate.c b/lib/pscom/pscom_migrate.c index 2d2ee0e5..b467a702 100644 --- a/lib/pscom/pscom_migrate.c +++ b/lib/pscom/pscom_migrate.c @@ -65,7 +65,7 @@ pscom_str_replace(char *search_str, char *replace_str, char *str) static int pscom_suspend_plugins(void) -{ +{ struct list_head *pos_sock; struct list_head *pos_con; int arch; @@ -86,13 +86,13 @@ int pscom_suspend_plugins(void) struct list_head *tmp_con; list_for_each_safe(pos_con, tmp_con, &sock->connections) { pscom_con_t *con = list_entry(pos_con, - pscom_con_t, + pscom_con_t, next); - + /* determine corresponding plugin */ arch = PSCOM_CON_TYPE2ARCH(con->pub.type); plugin = pscom_plugin_by_archid(arch); - + /* suspend all still pending on-demand connections, too */ if(con->pub.type == PSCOM_CON_TYPE_ONDEMAND) { con->read_suspend(con); @@ -104,10 +104,14 @@ int pscom_suspend_plugins(void) continue; /* shutdown the connection if not migratable */ - if (plugin->properties & + if (plugin->properties & PSCOM_PLUGIN_PROP_NOT_MIGRATABLE) { - - DPRINT(1, "SUSPENDING %s (type: %s)", pscom_con_info_str(&con->pub.remote_con_info),pscom_con_type_str(con->pub.type)); + + DPRINT(3, "%s:%u: send REQ to %s (type: %s)", + __FILE__, + __LINE__, + pscom_con_info_str(&con->pub.remote_con_info), + pscom_con_type_str(con->pub.type)); pscom_con_shutdown(con); list_add_tail(&con->shutdown_requested, &sock->shutdown_connections); @@ -123,10 +127,15 @@ int pscom_suspend_plugins(void) struct list_head *tmp_con; list_for_each_safe(pos_con, tmp_con, &sock->shutdown_connections) { pscom_con_t *con = list_entry(pos_con, - pscom_con_t, + pscom_con_t, shutdown_requested); - DPRINT(1, "WAITING for %s (type: %s)", pscom_con_info_str(&con->pub.remote_con_info),pscom_con_type_str(con->pub.type)); + DPRINT(3, "%s:%u: wait for ACK from %s (type: %s)", + __FILE__, + __LINE__, + pscom_con_info_str(&con->pub.remote_con_info), + pscom_con_type_str(con->pub.type)); + while ( (con->read_is_suspended == 0) || (con->write_is_suspended == 0) ) { con->read_start(con); con->write_start(con); @@ -134,35 +143,14 @@ int pscom_suspend_plugins(void) } list_del_init(&con->shutdown_requested); - - -// /* determine corresponding plugin */ -// arch = PSCOM_CON_TYPE2ARCH(con->pub.type); -// plugin = pscom_plugin_by_archid(arch); -// -// /* go to next connection if plugin not set */ -// if (plugin == NULL) -// continue; -// -// -// if (plugin->properties & -// PSCOM_PLUGIN_PROP_NOT_MIGRATABLE) { -// DPRINT(1, "Wait for suspending of %s (type: %s)", pscom_con_info_str(&con->pub.remote_con_info),pscom_con_type_str(con->pub.type)); -// while ( (con->read_is_suspended == 0) || (con->write_is_suspended == 0) ) { -// DPRINT(1, "Waiting ..."); -// con->read_start(con); -// con->write_start(con); -// pscom_test_any(); -// } -// } } + assert(list_empty(&sock->shutdown_connections)); } - + /* * Shutdown non-migratable plugins */ - DPRINT(1, "%s %u: Find non-migratable plugins ...", __FILE__, __LINE__); struct list_head *pos_plugin; list_for_each(pos_plugin, &pscom_plugins) { @@ -170,17 +158,17 @@ int pscom_suspend_plugins(void) if ((plugin->properties & PSCOM_PLUGIN_PROP_NOT_MIGRATABLE) && (plugin->destroy)) { - DPRINT(1, - "%s %u: Destroying '%s' ...", - __FILE__, + DPRINT(1, + "%s %u: Destroying '%s' ...", + __FILE__, __LINE__, plugin->name); if(plugin->destroy) { plugin->destroy(); } - DPRINT(1, - "%s %u: Successfully destroyed '%s'!", - __FILE__, + DPRINT(1, + "%s %u: Successfully destroyed '%s'!", + __FILE__, __LINE__, plugin->name); } @@ -218,10 +206,10 @@ int pscom_resume_plugins(void) /* iterate over all connections */ list_for_each(pos_con, &sock->connections) { pscom_con_t *con = list_entry(pos_con, - pscom_con_t, + pscom_con_t, next); /* resume connections */ - pscom_resume_connection(&con->pub); + pscom_resume_connection(&con->pub); } } @@ -238,8 +226,8 @@ int pscom_resume_non_migratable_plugins(void) } static -void pscom_message_callback(struct mosquitto *mosquitto_client, - void *arg, +void pscom_message_callback(struct mosquitto *mosquitto_client, + void *arg, const struct mosquitto_message *message) { int pid = -1; @@ -254,7 +242,7 @@ void pscom_message_callback(struct mosquitto *mosquitto_client, pid = -2; } else { msg = strtok(payload, " "); - + while (msg) { sscanf(msg, "%d", &pid); if (pid == my_pid) @@ -265,7 +253,7 @@ void pscom_message_callback(struct mosquitto *mosquitto_client, } else { pid = -2; } - + if (pid == my_pid || pid == -2) { @@ -303,10 +291,10 @@ void pscom_message_callback(struct mosquitto *mosquitto_client, // assert(0); break; - default: + default: DPRINT(1, "%s %d: ERROR: Unknown migration state (%d). " - "Abort!", - __FILE__, __LINE__, + "Abort!", + __FILE__, __LINE__, pscom.migration_state); assert(0); } @@ -338,7 +326,7 @@ void pscom_report_to_migfra(const char *status) strerror(errno)); exit(-1); } - + /* reset migration state */ pscom.migration_state = PSCOM_MIGRATION_INACTIVE; } @@ -388,11 +376,11 @@ void pscom_migration_handle_shutdown_req(void) false); if (err != MOSQ_ERR_SUCCESS) { fprintf(stderr, "%s %d: ERROR: Could not publish on '%s' - %d" - "(%d [%s])", + "(%d [%s])", __FILE__, __LINE__, PSCOM_MOSQUITTO_RESP_TOPIC, err, - errno, + errno, strerror(errno)); exit(-1); } @@ -402,7 +390,7 @@ void pscom_migration_handle_shutdown_req(void) sched_yield(); } - /* resume the connections now */ + /* resume the connections now */ pscom_migration_handle_resume_req(); } @@ -413,7 +401,7 @@ int pscom_migration_init(void) /* leave if feature should be disabled */ if (pscom.env.suspend_resume == 0) return 0; - + /* initialize libmosquitto */ if (mosquitto_lib_init() != MOSQ_ERR_SUCCESS) { DPRINT(1, "%s %d: ERROR: Could not init libmosquitto ", @@ -421,24 +409,24 @@ int pscom_migration_init(void) return PSCOM_ERR_STDERROR; } - /* create a new mosquitto client */ + /* create a new mosquitto client */ char client_name[PSCOM_MOSQUITTO_CLIENT_NAME_LENGTH]; char my_pid[10]; sprintf(my_pid, "_%d", getpid()); gethostname(client_name, PSCOM_MOSQUITTO_CLIENT_NAME_LENGTH); strcat(client_name, my_pid); - pscom_mosquitto_client = mosquitto_new(client_name, + pscom_mosquitto_client = mosquitto_new(client_name, true, NULL); if (pscom_mosquitto_client == NULL) { DPRINT(1, "%s %d: ERROR: Could create new mosquitto client " - "instance (%d [%s])", + "instance (%d [%s])", __FILE__, __LINE__, - errno, + errno, strerror(errno)); return PSCOM_ERR_STDERROR; } - + /* connect to mosquitto broker in BLOCKING manner */ int err; err = mosquitto_connect(pscom_mosquitto_client, @@ -447,10 +435,10 @@ int pscom_migration_init(void) PSCOM_KEEP_ALIVE_INT); if ( err != MOSQ_ERR_SUCCESS) { DPRINT(1, "%s %d: ERROR: Could connect to the broker - %d" - "(%d [%s])", + "(%d [%s])", __FILE__, __LINE__, err, - errno, + errno, strerror(errno)); return PSCOM_ERR_STDERROR; } else { @@ -476,11 +464,11 @@ int pscom_migration_init(void) 0); if (err != MOSQ_ERR_SUCCESS) { DPRINT(1, "%s %d: ERROR: Could not subscribe to '%s' - %d" - "(%d [%s])", + "(%d [%s])", __FILE__, __LINE__, pscom_mosquitto_req_topic, err, - errno, + errno, strerror(errno)); return PSCOM_ERR_STDERROR; } @@ -491,7 +479,7 @@ int pscom_migration_init(void) /* set the subscription callback */ mosquitto_message_callback_set(pscom_mosquitto_client, &pscom_message_callback); - + /* start the communication loop */ err = mosquitto_loop_start(pscom_mosquitto_client); if ( err != MOSQ_ERR_SUCCESS) { @@ -510,17 +498,17 @@ int pscom_migration_cleanup(void) { int err; - /* unsubscribe from the migration command topic */ + /* unsubscribe from the migration command topic */ err = mosquitto_unsubscribe(pscom_mosquitto_client, NULL, PSCOM_MOSQUITTO_REQ_TOPIC); if (err != MOSQ_ERR_SUCCESS) { DPRINT(1, "%s %d: ERROR: Could not unsubscribe from '%s' - %d" - "(%d [%s])", + "(%d [%s])", __FILE__, __LINE__, PSCOM_MOSQUITTO_REQ_TOPIC, err, - errno, + errno, strerror(errno)); return PSCOM_ERR_STDERROR; } @@ -546,7 +534,7 @@ int pscom_migration_cleanup(void) return PSCOM_ERR_STDERROR; } - /* destroy the mosquitto client */ + /* destroy the mosquitto client */ mosquitto_destroy(pscom_mosquitto_client); /* cleanup libmosquitto */ diff --git a/lib/pscom/pscom_priv.h b/lib/pscom/pscom_priv.h index 9ac06e67..980e32f3 100644 --- a/lib/pscom/pscom_priv.h +++ b/lib/pscom/pscom_priv.h @@ -291,7 +291,7 @@ struct PSCOM_con pscom_poll_reader_t poll_reader; struct list_head poll_next_send; // used by pscom.poll_sender - struct list_head shutdown_requested; // List for connections to be shut down + struct list_head shutdown_requested; // user by S/R protocol struct con_guard con_guard; // connection guard From 5b9d716db3e9ecdef3adcd99e84b268d94739501 Mon Sep 17 00:00:00 2001 From: Simon Pickartz Date: Thu, 2 Feb 2017 09:11:08 +0100 Subject: [PATCH 3/3] support S/R interface from sr-threaded branch --- lib/pscom/pscom_migrate.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/pscom/pscom_migrate.c b/lib/pscom/pscom_migrate.c index b467a702..e4a112dd 100644 --- a/lib/pscom/pscom_migrate.c +++ b/lib/pscom/pscom_migrate.c @@ -238,7 +238,7 @@ void pscom_message_callback(struct mosquitto *mosquitto_client, if ((char*)message->payload) { strcpy(payload, (char*)message->payload); - if (!strcmp(payload, "*")) { + if (!strcmp(payload, "*") || !strcmp(payload, "resume") || !strcmp(payload, "suspend")) { pid = -2; } else { msg = strtok(payload, " ");