| summaryrefslogtreecommitdiff | 
diff options
| author | Nathanael Sensfelder <SpamShield0@MultiAgentSystems.org> | 2017-06-15 12:48:19 +0200 | 
|---|---|---|
| committer | Nathanael Sensfelder <SpamShield0@MultiAgentSystems.org> | 2017-06-15 12:48:19 +0200 | 
| commit | 52d894c670cf9b9ae5c5276acfa7938fb4fbacd3 (patch) | |
| tree | 96d665b66589166f0ccf6ae9315c19b799562646 /src | |
Initial commit.
Diffstat (limited to 'src')
29 files changed, 1997 insertions, 0 deletions
| diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt new file mode 100644 index 0000000..0b242bc --- /dev/null +++ b/src/CMakeLists.txt @@ -0,0 +1,12 @@ +add_subdirectory(error) +add_subdirectory(core) +add_subdirectory(filter) +add_subdirectory(server) +add_subdirectory(parameters) + +set( +   SRC_FILES ${SRC_FILES} +   ${CMAKE_CURRENT_SOURCE_DIR}/main.c +) + +set(SRC_FILES ${SRC_FILES} PARENT_SCOPE) diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt new file mode 100644 index 0000000..fa07534 --- /dev/null +++ b/src/core/CMakeLists.txt @@ -0,0 +1,6 @@ +set( +   SRC_FILES ${SRC_FILES} +) + +set(SRC_FILES ${SRC_FILES} PARENT_SCOPE) + diff --git a/src/core/index.h b/src/core/index.h new file mode 100644 index 0000000..594a1d2 --- /dev/null +++ b/src/core/index.h @@ -0,0 +1,6 @@ +#ifndef _JH_CORE_INDEX_H_ +#define _JH_CORE_INDEX_H_ + +#include "index_types.h" + +#endif diff --git a/src/core/index_types.h b/src/core/index_types.h new file mode 100644 index 0000000..2180815 --- /dev/null +++ b/src/core/index_types.h @@ -0,0 +1,28 @@ +#ifndef _JH_CORE_INDEX_TYPES_H_ +#define _JH_CORE_INDEX_TYPES_H_ + +#include "../pervasive.h" + +/* + * JH_index is a replacement for size_t. As many indices are stored for every + * word learned, having control over which type of variable is used to represent + * those indices lets us scale the RAM usage. + */ + +#include <limits.h> +#include <stdint.h> + +/* Must be unsigned. */ +typedef unsigned int JH_index; + +/* Must be > 0. */ +#define JH_INDEX_MAX UINT_MAX +#define JH_INDEX_TAG "%u" + +#ifndef JH_RUNNING_FRAMA_C +   #if (JH_INDEX_MAX > SIZE_MAX) +      #error "JH_index should not be able to go higher than a size_t variable." +   #endif +#endif + +#endif diff --git a/src/error/CMakeLists.txt b/src/error/CMakeLists.txt new file mode 100644 index 0000000..fa07534 --- /dev/null +++ b/src/error/CMakeLists.txt @@ -0,0 +1,6 @@ +set( +   SRC_FILES ${SRC_FILES} +) + +set(SRC_FILES ${SRC_FILES} PARENT_SCOPE) + diff --git a/src/error/error.h b/src/error/error.h new file mode 100644 index 0000000..145c838 --- /dev/null +++ b/src/error/error.h @@ -0,0 +1,143 @@ +#ifndef _JH_ERROR_ERROR_H_ +#define _JH_ERROR_ERROR_H_ + +#include <stdio.h> + +#include "../pervasive.h" + +#ifndef JH_DEBUG_PROGRAM_FLOW +   #define JH_DEBUG_PROGRAM_FLOW   (0 || JH_DEBUG_ALL) +#endif + +#ifndef JH_DEBUG_CONFIG +   #define JH_DEBUG_CONFIG         (0 || JH_DEBUG_ALL) +#endif + +#ifndef JH_DEBUG_LEARNING +   #define JH_DEBUG_LEARNING       (0 || JH_DEBUG_ALL) +#endif + +#ifndef JH_DEBUG_NETWORK +   #define JH_DEBUG_NETWORK  1 +#endif + +#ifndef JH_DEBUG_NETWORK +   #define JH_DEBUG_NETWORK        (0 || JH_DEBUG_ALL) +#endif + +#define JH_ENABLE_WARNINGS_OUTPUT              1 +#define JH_ENABLE_RUNTIME_ERRORS_OUTPUT        1 +#define JH_ENABLE_PROGRAMMING_ERRORS_OUTPUT    1 +#define JH_ENABLE_FATAL_ERROR_OUTPUT           1 + +#ifdef JH_ENABLE_ERROR_LOCATION +   #define JH_LOCATION " [" __FILE__ "][" JH_TO_STRING(__LINE__) "]" +#else +   #define JH_LOCATION "" +#endif + +#define JH_PRINT_STDERR(io, symbol, str, ...)\ +   fprintf(io, "[" symbol "]" JH_LOCATION " " str "\n", __VA_ARGS__); + +/* + * Given that we use preprocessor contants as flags, we can expect the compilers + * to remove the test condition for disabled flags. No need to be shy about + * allowing many debug options. + */ + +#define JH_DEBUG(io, flag, str, ...)\ +   JH_ISOLATE\ +   (\ +      if (flag)\ +      {\ +         JH_PRINT_STDERR(io, "D", str, __VA_ARGS__);\ +      }\ +   ) + + +#define JH_WARNING(io, str, ...)\ +   JH_ISOLATE\ +   (\ +      if (JH_ENABLE_WARNINGS_OUTPUT)\ +      {\ +         JH_PRINT_STDERR(io, "W", str, __VA_ARGS__);\ +      }\ +   ) + +#define JH_ERROR(io, str, ...)\ +   JH_ISOLATE\ +   (\ +      if (JH_ENABLE_RUNTIME_ERRORS_OUTPUT)\ +      {\ +         JH_PRINT_STDERR(io, "E", str, __VA_ARGS__);\ +      }\ +   ) + +#define JH_PROG_ERROR(io, str, ...)\ +   JH_ISOLATE\ +   (\ +      if (JH_ENABLE_PROGRAMMING_ERRORS_OUTPUT)\ +      {\ +         JH_PRINT_STDERR(io, "P", str, __VA_ARGS__);\ +      }\ +   ) + +#define JH_FATAL(io, str, ...)\ +   JH_ISOLATE\ +   (\ +     if (JH_ENABLE_FATAL_ERROR_OUTPUT)\ +      {\ +         JH_PRINT_STDERR(io, "F", str, __VA_ARGS__);\ +      }\ +   ) + +/* For outputs without dynamic content (static). ******************************/ + +#define JH_PRINT_S_STDERR(io, symbol, str)\ +   fprintf(io, "[" symbol "]" JH_LOCATION " " str "\n"); + +#define JH_S_DEBUG(io, flag, str)\ +   JH_ISOLATE\ +   (\ +      if (flag)\ +      {\ +         JH_PRINT_S_STDERR(io, "D", str);\ +      }\ +   ) + +#define JH_S_WARNING(io, str)\ +   JH_ISOLATE\ +   (\ +      if (JH_ENABLE_WARNINGS_OUTPUT)\ +      {\ +         JH_PRINT_S_STDERR(io, "W", str);\ +      }\ +   ) + +#define JH_S_ERROR(io, str)\ +   JH_ISOLATE\ +   (\ +      if (JH_ENABLE_RUNTIME_ERRORS_OUTPUT)\ +      {\ +         JH_PRINT_S_STDERR(io, "E", str);\ +      }\ +   ) + +#define JH_S_PROG_ERROR(io, str)\ +   JH_ISOLATE\ +   (\ +      if (JH_ENABLE_PROGRAMMING_ERRORS_OUTPUT)\ +      {\ +         JH_PRINT_S_STDERR(io, "P", str);\ +      }\ +   ) + +#define JH_S_FATAL(io, str)\ +   JH_ISOLATE\ +   (\ +     if (JH_ENABLE_FATAL_ERROR_OUTPUT)\ +      {\ +         JH_PRINT_S_STDERR(io, "F", str);\ +      }\ +   ) +#endif diff --git a/src/filter/CMakeLists.txt b/src/filter/CMakeLists.txt new file mode 100644 index 0000000..c8add44 --- /dev/null +++ b/src/filter/CMakeLists.txt @@ -0,0 +1,6 @@ +set( +   SRC_FILES ${SRC_FILES} +   ${CMAKE_CURRENT_SOURCE_DIR}/filter.c +) + +set(SRC_FILES ${SRC_FILES} PARENT_SCOPE) diff --git a/src/filter/filter.c b/src/filter/filter.c new file mode 100644 index 0000000..f668756 --- /dev/null +++ b/src/filter/filter.c @@ -0,0 +1,368 @@ +#include <stdlib.h> +#include <errno.h> +#include <string.h> +#include <unistd.h> + +#include "../pervasive.h" +#include "../error/error.h" +#include "../parameters/parameters.h" + +#include "filter.h" + +char to_lowercase (const char c) +{ +   if ((c >= 'A') && (c <= 'Z')) +   { +      return 'z' - ('Z' - c); +   } + +   return c; +} + +static int send_downstream +( +   struct JH_filter filter [const restrict static 1], +   const int upstream_socket, +   const int downstream_socket +) +{ +   char c; +   ssize_t io_bytes; +   const int old_errno = errno; + +   for (;;) +   { +      errno = 0; + +      io_bytes = +         read +         ( +            upstream_socket, +            (void *) &c, +            sizeof(char) +         ); + +      if (io_bytes == -1) +      { +         JH_ERROR +         ( +            stderr, +            "Upstream read error %d (\"%s\").", +            errno, +            strerror(errno) +         ); + +         errno = old_errno; + +         return -1; +      } +      else if (io_bytes == 0) +      { +         return 1; +      } + +      switch (filter->buffer_index) +      { +         case 0: +            if (c == '?') +            { +               filter->buffer_index = 1; +            } +            else +            { +               filter->buffer_index = -1; +            } +            break; + +         case 1: +            if (c == 'R') +            { +               filter->buffer_index = 2; +            } +            else +            { +               filter->buffer_index = -1; +            } +            break; + +         case 2: +            if ((c == 'L') || (c == 'R')) +            { +               filter->prev_c = c; +               filter->buffer_index = 3; +            } +            else +            { +               filter->prev_c = '\0'; +               filter->buffer_index = -1; +            } +            break; + +         case 3: +            if (c == ' ') +            { +               filter->buffer_index = JH_FILTER_BUFFER_SIZE; +            } +            else if ((c == 'R') && (filter->prev_c == 'L')) +            { +               filter->buffer_index = 4; +            } +            else +            { +               filter->buffer_index = -1; +            } +            break; + +         case 4: +            if (c == ' ') +            { +               filter->buffer_index = JH_FILTER_BUFFER_SIZE; +            } +            else +            { +               filter->buffer_index = -1; +            } +            break; + +         case JH_FILTER_BUFFER_SIZE: +            c = to_lowercase(c); +            break; + +         default: +            break; +      } + +      errno = 0; + +      io_bytes = write +      ( +         downstream_socket, +         (void *) &c, +         sizeof(char) +      ); + +      if (io_bytes == -1) +      { +         JH_ERROR +         ( +            stderr, +            "Upstream write error %d (\"%s\").", +            errno, +            strerror(errno) +         ); + +         errno = old_errno; + +         return -1; +      } + +      errno = old_errno; + +      if (c == '\n') +      { +         filter->buffer_index = 0; +         filter->state = JH_FILTER_IS_SENDING_UPSTREAM; + +         return 0; +      } +   } +} + +static int send_upstream +( +   struct JH_filter filter [const restrict static 1], +   const int upstream_socket, +   const int downstream_socket +) +{ +   char c; +   ssize_t io_bytes; +   const int old_errno = errno; + +   for (;;) +   { +      errno = 0; + +      io_bytes = +         read +         ( +            downstream_socket, +            (void *) &c, +            sizeof(char) +         ); + +      if (io_bytes == -1) +      { +         JH_ERROR +         ( +            stderr, +            "Downstream read error %d (\"%s\").", +            errno, +            strerror(errno) +         ); + +         errno = old_errno; + +         return -1; +      } +      else if (io_bytes == 0) +      { +         return 1; +      } + +      errno = 0; + +      io_bytes = write +      ( +         upstream_socket, +         (void *) &c, +         sizeof(char) +      ); + +      if (io_bytes == -1) +      { +         JH_ERROR +         ( +            stderr, +            "Upstream write error %d (\"%s\").", +            errno, +            strerror(errno) +         ); + +         errno = old_errno; + +         return -1; +      } + +      errno = old_errno; + +      switch (filter->buffer_index) +      { +         case -1: +            if (c == '\n') +            { +               filter->buffer_index = 0; +            } +            break; + +         case 0: +            if (c == '!') +            { +               filter->buffer_index = 1; +            } +            else +            { +               filter->buffer_index = -1; +            } +            break; + +         case 1: +            if ((c == 'N') || (c == 'P')) +            { +               filter->buffer_index = 2; +            } +            else +            { +               filter->buffer_index = -1; +            } +            break; + +         case 2: +            if (c == ' ') +            { +               filter->buffer_index = 3; +            } +            else +            { +               filter->buffer_index = -1; +            } +            break; + +         case 3: +            if (c == '\n') +            { +               filter->buffer_index = 0; +               filter->state = JH_FILTER_IS_SENDING_DOWNSTREAM; + +               return 0; +            } +            else +            { +               filter->buffer_index = -1; +            } +            break; + +         default: +            JH_PROG_ERROR +            ( +               stderr, +               "Invalid value for 'filter->buffer_index': %d.", +               filter->buffer_index +            ); + +            filter->buffer_index = 0; + +            return -1; +      } +   } + +   return -1; +} + +/******************************************************************************/ +/** EXPORTED ******************************************************************/ +/******************************************************************************/ + +int JH_filter_step +( +   struct JH_filter filter [const restrict static 1], +   const int upstream_socket, +   const int downstream_socket +) +{ +   switch (filter->state) +   { +      case JH_FILTER_IS_SENDING_DOWNSTREAM: +         JH_DEBUG(stderr, 1, "<SENDING_DOWN> (index: %d)", filter->buffer_index); +         return +            send_downstream +            ( +               filter, +               upstream_socket, +               downstream_socket +            ); + +      case JH_FILTER_IS_SENDING_UPSTREAM: +         JH_DEBUG(stderr, 1, "<SENDING_UP> (index: %d)", filter->buffer_index); +         return +            send_upstream +            ( +               filter, +               upstream_socket, +               downstream_socket +            ); + +      default: +         return -1; +   } +} + +int JH_filter_initialize +( +   struct JH_filter filter [const restrict static 1] +) +{ +   filter->state = JH_FILTER_IS_SENDING_DOWNSTREAM; +   filter->buffer_index = 0; +   filter->prev_c = '\0'; + +   return 0; +} + +void JH_filter_finalize +( +   struct JH_filter filter [const restrict static 1] +) +{ +   /* Nothing to do */ +} diff --git a/src/filter/filter.h b/src/filter/filter.h new file mode 100644 index 0000000..0b17fde --- /dev/null +++ b/src/filter/filter.h @@ -0,0 +1,26 @@ +#ifndef _JH_FILTER_H_ +#define _JH_FILTER_H_ + +#include "../parameters/parameters_types.h" +#include "../server/server_types.h" + +#include "filter_types.h" + +int JH_filter_initialize +( +   struct JH_filter filter [const restrict static 1] +); + +int JH_filter_step +( +   struct JH_filter filter [const restrict static 1], +   const int upstream_socket, +   const int downstream_socket +); + +void JH_filter_finalize +( +   struct JH_filter filter [const restrict static 1] +); + +#endif diff --git a/src/filter/filter_types.h b/src/filter/filter_types.h new file mode 100644 index 0000000..7f36d6a --- /dev/null +++ b/src/filter/filter_types.h @@ -0,0 +1,21 @@ +#ifndef _JH_FILTER_TYPES_H_ +#define _JH_FILTER_TYPES_H_ + +#include <stdio.h> + +#define JH_FILTER_BUFFER_SIZE 5 + +enum JH_filter_state +{ +   JH_FILTER_IS_SENDING_DOWNSTREAM, +   JH_FILTER_IS_SENDING_UPSTREAM +}; + +struct JH_filter +{ +   enum JH_filter_state state; +   int buffer_index; +   int prev_c; +}; + +#endif diff --git a/src/main.c b/src/main.c new file mode 100644 index 0000000..b8bef88 --- /dev/null +++ b/src/main.c @@ -0,0 +1,53 @@ +#include <sys/socket.h> +#include <sys/un.h> + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <errno.h> +#include <unistd.h> + +#include "error/error.h" +#include "parameters/parameters.h" +#include "server/server.h" + +#include "pervasive.h" + +static void print_help (const char runnable [const restrict static 1]) +{ +   printf +   ( +      "JabberHive - Lowercase\n" +      "Software Version %d\n" +      "Protocol Version %d\n" +      "\nUsages:\n" +      "   JH GATEWAY:\t%s SOCKET_NAME DESTINATION\n" +      "   SHOW HELP:\tAnything else.\n" +      "\nParameters:\n" +      "   SOCKET_NAME:\tValid UNIX socket.\n" +      "   DESTINATION:\tValid UNIX socket.\n", +      JH_PROGRAM_VERSION, +      JH_PROTOCOL_VERSION, +      runnable +   ); +} + + +int main (int const argc, const char * argv [const static argc]) +{ +   struct JH_parameters params; + +   if (JH_parameters_initialize(¶ms, argc, argv) < 0) +   { +      print_help(argv[0]); + +      return -1; +   } + +   if (JH_server_main(¶ms) < 0) +   { +      return -1; +   } + +   return 0; +} diff --git a/src/parameters/CMakeLists.txt b/src/parameters/CMakeLists.txt new file mode 100644 index 0000000..2aa7ece --- /dev/null +++ b/src/parameters/CMakeLists.txt @@ -0,0 +1,7 @@ +set( +   SRC_FILES ${SRC_FILES} +   ${CMAKE_CURRENT_SOURCE_DIR}/parameters.c +   ${CMAKE_CURRENT_SOURCE_DIR}/parameters_getters.c +) +set(SRC_FILES ${SRC_FILES} PARENT_SCOPE) + diff --git a/src/parameters/parameters.c b/src/parameters/parameters.c new file mode 100644 index 0000000..1118be1 --- /dev/null +++ b/src/parameters/parameters.c @@ -0,0 +1,87 @@ +#include <stdio.h> +#include <stdlib.h> +#include <errno.h> +#include <string.h> +#include <limits.h> + +#include "../core/index.h" +#include "../error/error.h" + +#include "parameters.h" + +static void set_default_to_all_fields +( +   struct JH_parameters param [const restrict static 1] +) +{ +   param->socket_name = (const char *) NULL; +   param->dest_socket_name = (const char *) NULL; +} + +static int is_valid +( +   struct JH_parameters param [const restrict static 1] +) +{ +   int valid; + +   valid = 1; + +   if (param->socket_name == (const char *) NULL) +   { +      JH_S_FATAL(stderr, "Missing parameter: This entity's socket name."); + +      valid = 0; +   } + +   if (param->dest_socket_name == (const char *) NULL) +   { +      JH_S_FATAL(stderr, "Missing parameter: The destination's socket name."); + +      valid = 0; +   } + +   return valid; +} + +static void set_parameters +( +   struct JH_parameters param [const restrict static 1], +   int const argc, +   const char * argv [const static argc] +) +{ +   if (argc < 2) +   { +      return; +   } + +   param->socket_name = argv[1]; + +   if (argc < 3) +   { +      return; +   } + +   param->dest_socket_name = argv[2]; + +} + +int JH_parameters_initialize +( +   struct JH_parameters param [const restrict static 1], +   int const argc, +   const char * argv [const static argc] +) +{ +   set_default_to_all_fields(param); + +   set_parameters(param, argc, argv); + +   if (!is_valid(param)) +   { +      return -1; +   } + +   return 0; +} diff --git a/src/parameters/parameters.h b/src/parameters/parameters.h new file mode 100644 index 0000000..89f6e55 --- /dev/null +++ b/src/parameters/parameters.h @@ -0,0 +1,25 @@ +#ifndef _JH_CLI_PARAMETERS_H_ +#define _JH_CLI_PARAMETERS_H_ + +#include <stdlib.h> + +#include "parameters_types.h" + +int JH_parameters_initialize +( +   struct JH_parameters param [const restrict static 1], +   int const argc, +   const char * argv [const static argc] +); + +const char * JH_parameters_get_socket_name +( +   const struct JH_parameters param [const restrict static 1] +); + +const char * JH_parameters_get_dest_socket_name +( +   const struct JH_parameters param [const restrict static 1] +); + +#endif diff --git a/src/parameters/parameters_getters.c b/src/parameters/parameters_getters.c new file mode 100644 index 0000000..fddc3a7 --- /dev/null +++ b/src/parameters/parameters_getters.c @@ -0,0 +1,19 @@ +#include <stdlib.h> + +#include "parameters.h" + +const char * JH_parameters_get_socket_name +( +   const struct JH_parameters param [const restrict static 1] +) +{ +   return param->socket_name; +} + +const char * JH_parameters_get_dest_socket_name +( +   const struct JH_parameters param [const restrict static 1] +) +{ +   return param->dest_socket_name; +} diff --git a/src/parameters/parameters_types.h b/src/parameters/parameters_types.h new file mode 100644 index 0000000..667def7 --- /dev/null +++ b/src/parameters/parameters_types.h @@ -0,0 +1,18 @@ +#ifndef _JH_CLI_PARAMETERS_TYPES_H_ +#define _JH_CLI_PARAMETERS_TYPES_H_ + +#include <stdlib.h> + +#define JH_PARAMETERS_COUNT 2 + +#define JH_PARAMETERS_DEFAULT_MAIN_STORAGE_FILENAME "storage.txt" +#define JH_PARAMETERS_DEFAULT_TEMP_STORAGE_PREFIX    "/tmp/jabberhive-storage/storage_thread_" + +struct JH_parameters +{ +   /* JH **********************************************************************/ +   const char * restrict socket_name; +   const char * restrict dest_socket_name; +}; + +#endif diff --git a/src/pervasive.h b/src/pervasive.h new file mode 100644 index 0000000..27d832d --- /dev/null +++ b/src/pervasive.h @@ -0,0 +1,28 @@ +#ifndef _JH_PERVASIVE_H_ +#define _JH_PERVASIVE_H_ + +#include <string.h> + +#define JH_PROGRAM_VERSION   1 +#define JH_PROTOCOL_VERSION  1 + +#ifdef __FRAMA_C__ +   #define JH_RUNNING_FRAMA_C   1 +#endif + +#define JH_DEBUG_ALL 1 + +#ifndef JH_DEBUG_ALL +   #define JH_DEBUG_ALL 0 +#endif + +#define JH__TO_STRING(x) #x +#define JH_TO_STRING(x) JH__TO_STRING(x) +#define JH_ISOLATE(a) do {a} while (0) + +/* strncmp stops at '\0' and strlen does not count '\0'. */ +#define JH_IS_PREFIX(a, b) (strncmp(a, b, strlen(a)) == 0) + +#define JH_STRING_EQUALS(a, b) (strcmp(a, b) == 0) + +#endif diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt new file mode 100644 index 0000000..9ade789 --- /dev/null +++ b/src/server/CMakeLists.txt @@ -0,0 +1,15 @@ +set( +   SRC_FILES ${SRC_FILES} +   ${CMAKE_CURRENT_SOURCE_DIR}/server.c +   ${CMAKE_CURRENT_SOURCE_DIR}/server_create_socket.c +   ${CMAKE_CURRENT_SOURCE_DIR}/server_finalize.c +   ${CMAKE_CURRENT_SOURCE_DIR}/server_initialize.c +   ${CMAKE_CURRENT_SOURCE_DIR}/server_joining_threads.c +   ${CMAKE_CURRENT_SOURCE_DIR}/server_new_connection.c +   ${CMAKE_CURRENT_SOURCE_DIR}/server_signal.c +   ${CMAKE_CURRENT_SOURCE_DIR}/server_wait_for_event.c +   ${CMAKE_CURRENT_SOURCE_DIR}/server_worker.c +) + +set(SRC_FILES ${SRC_FILES} PARENT_SCOPE) + diff --git a/src/server/server.c b/src/server/server.c new file mode 100644 index 0000000..640584f --- /dev/null +++ b/src/server/server.c @@ -0,0 +1,101 @@ +#include <signal.h> +#include <string.h> +#include <stdio.h> + +#include "../parameters/parameters.h" + +#include "server.h" + +int JH_server_main +( +   const struct JH_parameters params [const restrict static 1] +) +{ +   struct JH_server server; +   JH_index retries; + +   retries = 0; +   /* TODO +   if (JH_server_set_signal_handlers < 0) +   { +      return -1; +   } +   */ + +   if (JH_server_initialize(&server, params) < 0) +   { +      return -1; +   } + +   while (JH_server_is_running()) +   { +      switch (JH_server_wait_for_event(&server)) +      { +         case 0: /* Timed out or signal'd. */ +            JH_S_DEBUG(stderr, 1, "Timed out..."); +            JH_server_handle_joining_threads(&server); + +            retries = 0; + +            break; + +         case 1: /* New client attempted connection. */ +            JH_S_DEBUG(stderr, 1, "New connection."); +            JH_server_handle_joining_threads(&server); +            (void) JH_server_handle_new_connection(&server); + +            retries = 0; + +            break; + +         case -1: /* Something bad happened. */ +            retries += 1; + +            if (retries == JH_SERVER_MAX_RETRIES) +            { +               JH_server_finalize(&server); + +               return -1; +            } + +            break; + +         default: +            JH_S_PROG_ERROR +            ( +               stderr, +               "Unexpected wait_for_event return value." +            ); + +            break; +      } +   } + +   /* Waiting for the threads to join... */ +   while (server.workers.currently_running > 0) +   { +      switch (JH_server_wait_for_event(&server)) +      { +         case 0: /* Timed out. */ +         case 1: /* New client attempted connection. */ +            JH_server_handle_joining_threads(&server); +            break; + +         case -1: /* Something bad happened. */ +            retries += 1; + +            if (retries == JH_SERVER_MAX_RETRIES) +            { +               JH_server_finalize(&server); + +               return -1; +            } +            break; +      } +   } + +   JH_server_finalize(&server); + +   return 0; +} + diff --git a/src/server/server.h b/src/server/server.h new file mode 100644 index 0000000..16ce785 --- /dev/null +++ b/src/server/server.h @@ -0,0 +1,84 @@ +#ifndef _JH_SERVER_SERVER_H_ +#define _JH_SERVER_SERVER_H_ + +#include "../parameters/parameters_types.h" + +#include "server_types.h" + +int JH_server_initialize +( +   struct JH_server server [const restrict static 1], +   const struct JH_parameters params [const restrict static 1] +); + +int JH_server_socket_open +( +   struct JH_server_socket server_socket [const restrict static 1], +   const char socket_name [const restrict static 1] +); + +void JH_server_request_termination (void); +int JH_server_is_running (void); +int JH_server_set_signal_handlers (void); + +int JH_server_main +( +   const struct JH_parameters params [const restrict static 1] +); + +void JH_server_finalize (struct JH_server [const restrict static 1]); + +int JH_server_wait_for_event +( +   struct JH_server server [const restrict static 1] +); + +void JH_server_handle_joining_threads +( +   struct JH_server server [const restrict static 1] +); + +int JH_server_handle_new_connection +( +   struct JH_server server [const restrict static 1] +); + +void * JH_server_worker_main (void * input); +void * JH_server_worker_data_merger_main (void * input); + +int JH_server_worker_receive +( +   struct JH_server_worker worker [const restrict static 1] +); + +int JH_server_worker_handle_request +( +   struct JH_server_worker worker [const restrict static 1] +); + +int JH_server_worker_send_confirm_pipelining_support +( +   struct JH_server_worker worker [const restrict static 1] +); + +int JH_server_worker_send_confirm_protocol_version +( +   struct JH_server_worker worker [const restrict static 1] +); + +int JH_server_worker_send_positive +( +   struct JH_server_worker worker [const restrict static 1] +); + +int JH_server_worker_send_negative +( +   struct JH_server_worker worker [const restrict static 1] +); + +int JH_server_worker_send_generated_reply +( +   struct JH_server_worker worker [const restrict static 1] +); + +#endif diff --git a/src/server/server_create_socket.c b/src/server/server_create_socket.c new file mode 100644 index 0000000..5e0c00b --- /dev/null +++ b/src/server/server_create_socket.c @@ -0,0 +1,195 @@ +#include <errno.h> +#include <fcntl.h> +#include <stdio.h> +#include <string.h> +#include <unistd.h> + +#include <sys/socket.h> +#include <sys/un.h> + +#include "../error/error.h" + +#include "server.h" + +static int create_socket (int result [const restrict static 1]) +{ +   const int old_errno = errno; + +   errno = 0; +   *result = socket(AF_UNIX, SOCK_STREAM, 0); + +   if (*result == -1) +   { +      JH_FATAL +      ( +         stderr, +         "Unable to create server socket: %s.", +         strerror(errno) +      ); + +      errno = old_errno; + +      return -1; +   } + +   errno = old_errno; + +   return 0; +} + +static int bind_socket +( +   const int socket, +   const char socket_name [const restrict static 1] +) +{ +   struct sockaddr_un addr; +   const int old_errno = errno; + +   errno = 0; +   memset(&addr, 0, sizeof(struct sockaddr_un)); + +   addr.sun_family = AF_UNIX; + +   strncpy +   ( +      (void *) addr.sun_path, +      (const void *) socket_name, +      (sizeof(addr.sun_path) - 1) +   ); + +   errno = old_errno; + +   if +   ( +      bind +      ( +         socket, +         (const struct sockaddr *) &addr, +         (socklen_t) sizeof(struct sockaddr_un) +      ) != 0 +   ) +   { +      JH_FATAL +      ( +         stderr, +         "Unable to bind server socket to %s: %s.", +         socket_name, +         strerror(errno) +      ); + +      errno = old_errno; + +      return -1; +   } + +   errno = old_errno; + +   return 0; +} + +static int set_socket_to_unblocking (const int socket) +{ +   int current_flags; +   const int old_errno = errno; + +   current_flags = fcntl(socket, F_GETFD); + +   if (current_flags == -1) +   { +      JH_FATAL +      ( +         stderr, +         "Unable to get server socket properties: %s.", +         strerror(errno) +      ); + +      errno = old_errno; + +      return -1; +   } + +   /* current_flags = current_flags & (~O_NONBLOCK); */ + +   current_flags = fcntl(socket, F_SETFD, (current_flags | O_NONBLOCK)); + +   if (current_flags == -1) +   { +      JH_FATAL +      ( +         stderr, +         "Unable to set server socket properties: %s.", +         strerror(errno) +      ); + +      errno = old_errno; + +      return -2; +   } + +   errno = old_errno; + +   return 0; +} + +static int set_socket_as_listener (const int socket) +{ +   const int old_errno = errno; + +   if (listen(socket, JH_SERVER_SOCKET_LISTEN_BACKLOG) != 0) +   { +      JH_FATAL +      ( +         stderr, +         "Unable to set server socket properties: %s.", +         strerror(errno) +      ); + +      errno = old_errno; + +      return -1; +   } + +   errno = old_errno; + +   return 0; +} + +int JH_server_socket_open +( +   struct JH_server_socket server_socket [const restrict static 1], +   const char socket_name [const restrict static 1] +) +{ +   printf("\"%s\"\n", socket_name); +   if (create_socket(&(server_socket->file_descriptor)) < 0) +   { +      return -1; +   } + +   if (bind_socket(server_socket->file_descriptor, socket_name) < 0) +   { +      close(server_socket->file_descriptor); + +      return -1; +   } + +   if (set_socket_to_unblocking(server_socket->file_descriptor) < 0) +   { +      close(server_socket->file_descriptor); + +      return -1; +   } + +   if (set_socket_as_listener(server_socket->file_descriptor) < 0) +   { +      close(server_socket->file_descriptor); + +      return -1; +   } + +   FD_ZERO(&(server_socket->as_a_set)); +   FD_SET(server_socket->file_descriptor, &(server_socket->as_a_set)); + +   return 0; +} diff --git a/src/server/server_finalize.c b/src/server/server_finalize.c new file mode 100644 index 0000000..25ea672 --- /dev/null +++ b/src/server/server_finalize.c @@ -0,0 +1,42 @@ +#include <stdlib.h> +#include <unistd.h> + +#include "../parameters/parameters.h" + +#include "server.h" + +static void finalize_thread_collection +( +   struct JH_server_thread_collection workers [const restrict static 1] +) +{ +   free((void *) workers->threads); + +   workers->threads_capacity = 0; + +   pthread_mutex_destroy(&(workers->mutex)); +   pthread_barrier_destroy(&(workers->barrier)); + +   workers->currently_running = 0; +} + +static void finalize_socket +( +   struct JH_server_socket socket [const restrict static 1] +) +{ +   FD_ZERO(&(socket->as_a_set)); + +   close(socket->file_descriptor); + +   socket->file_descriptor = -1; +} + +void JH_server_finalize +( +   struct JH_server server [const restrict static 1] +) +{ +   finalize_thread_collection(&(server->workers)); +   finalize_socket(&(server->socket)); +} diff --git a/src/server/server_initialize.c b/src/server/server_initialize.c new file mode 100644 index 0000000..4b94fc6 --- /dev/null +++ b/src/server/server_initialize.c @@ -0,0 +1,106 @@ +#include <signal.h> +#include <string.h> +#include <stdio.h> +#include <stdlib.h> + +#include "../core/index.h" +#include "../parameters/parameters.h" + +#include "server.h" + +static int initialize_worker_collection +( +   struct JH_server_thread_collection c [const restrict static 1], +   const struct JH_parameters params [const restrict static 1] +) +{ +   int error; + +   c->threads = (struct JH_server_thread_data *) NULL; +   c->threads_capacity = 0; +   c->currently_running = 0; + +   error = +      pthread_mutex_init +      ( +         &(c->mutex), +         (const pthread_mutexattr_t *) NULL +      ); + +   if (error != 0) +   { +      JH_FATAL +      ( +         stderr, +         "Unable to initialize worker collection's mutex (error: %d): %s.", +         error, +         strerror(error) +      ); + +      return -1; +   } + +   error = +      pthread_barrier_init +      ( +         &(c->barrier), +         (const pthread_barrierattr_t *) NULL, +         2 +      ); + +   if (error != 0) +   { +      pthread_mutex_destroy(&(c->mutex)); + +      JH_FATAL +      ( +         stderr, +         "Unable to initialize worker collection's barrier (error: %d): %s.", +         error, +         strerror(error) +      ); + +      return -1; +   } + +   return 0; +} + +void initialize_thread_parameters +( +   struct JH_server server [const restrict static 1], +   const struct JH_parameters params [const restrict static 1] +) +{ +   server->thread_params.thread_collection = &(server->workers); +   server->thread_params.server_params = params; +   server->thread_params.socket = -1; +} + +int JH_server_initialize +( +   struct JH_server server [const restrict static 1], +   const struct JH_parameters params [const restrict static 1] +) +{ +   if (initialize_worker_collection(&(server->workers), params) < 0) +   { +      return -1; +   } + +   if +   ( +      JH_server_socket_open +      ( +         &(server->socket), +         JH_parameters_get_socket_name(params) +      ) < 0 +   ) +   { +      return -2; +   } + +   initialize_thread_parameters(server, params); + +   return 0; +} diff --git a/src/server/server_joining_threads.c b/src/server/server_joining_threads.c new file mode 100644 index 0000000..e1d92ca --- /dev/null +++ b/src/server/server_joining_threads.c @@ -0,0 +1,38 @@ +#include <sys/socket.h> + +#include <errno.h> +#include <string.h> +#include <stdio.h> +#include <stdlib.h> +#include <stdint.h> +#include <unistd.h> + +#include "../parameters/parameters.h" + +#include "server.h" + +void JH_server_handle_joining_threads +( +   struct JH_server server [const restrict static 1] +) +{ +   JH_index i; + +   pthread_mutex_lock(&(server->workers.mutex)); + +   for (i = 0; i < server->workers.threads_capacity; ++i) +   { +      if (server->workers.threads[i].state == JH_SERVER_JOINING_THREAD) +      { +         JH_DEBUG(stderr, 1, "Joining thread %u", i); + +         pthread_join(server->workers.threads[i].posix_id, (void **) NULL); + +         server->workers.threads[i].state = JH_SERVER_NO_THREAD; + +         server->workers.currently_running -= 1; +      } +   } + +   pthread_mutex_unlock(&(server->workers.mutex)); +} diff --git a/src/server/server_new_connection.c b/src/server/server_new_connection.c new file mode 100644 index 0000000..af007ee --- /dev/null +++ b/src/server/server_new_connection.c @@ -0,0 +1,184 @@ +#include <sys/socket.h> + +#include <errno.h> +#include <string.h> +#include <stdio.h> +#include <stdlib.h> +#include <stdint.h> +#include <unistd.h> + +#include "../parameters/parameters.h" + +#include "server.h" + +static int get_new_socket (struct JH_server server [const restrict static 1]) +{ +   const int old_errno = errno; + +   server->thread_params.socket = +      accept +      ( +         server->socket.file_descriptor, +         (struct sockaddr *) NULL, +         (socklen_t *) NULL +      ); + +   if (server->thread_params.socket == -1) +   { +      JH_ERROR +      ( +         stderr, +         "Unable to accept on the server's socket: %s.", +         strerror(errno) +      ); + +      errno = old_errno; + +      return -1; +   } + +   errno = old_errno; + +   return 0; +} + +static int get_new_thread (struct JH_server server [const restrict static 1]) +{ +   struct JH_server_thread_data * new_threads; +   JH_index i; + +   pthread_mutex_lock(&(server->workers.mutex)); + +   for (i = 0; i < server->workers.threads_capacity; ++i) +   { +      if (server->workers.threads[i].state == JH_SERVER_NO_THREAD) +      { +         server->thread_params.thread_id = i; + +         pthread_mutex_unlock(&(server->workers.mutex)); + +         return 0; +      } +   } + +   if +   ( +      (server->workers.threads_capacity == JH_INDEX_MAX) +      || +      ( +         (size_t) (server->workers.threads_capacity + 1) +         > (SIZE_MAX / sizeof(struct JH_server_thread_data)) +      ) +   ) +   { +      JH_S_ERROR +      ( +         stderr, +         "Maximum number of concurrent threads attained, unable to add more." +      ); + +      pthread_mutex_unlock(&(server->workers.mutex)); + +      return -1; +   } + +   server->thread_params.thread_id = server->workers.threads_capacity; +   server->workers.threads_capacity += 1; + +   new_threads = +      (struct JH_server_thread_data *) realloc +      ( +         server->workers.threads, +         ( +            sizeof(struct JH_server_thread_data) +            * ((size_t) server->workers.threads_capacity) +         ) +      ); + +   if (new_threads == ((struct JH_server_thread_data *) NULL)) +   { +      JH_S_ERROR +      ( +         stderr, +         "Reallocation of the threads' data list failed." +      ); + +      pthread_mutex_unlock(&(server->workers.mutex)); + +      return -1; +   } + +   server->workers.threads = new_threads; + +   pthread_mutex_unlock(&(server->workers.mutex)); + +   return 0; +} + +static int spawn_thread +( +   struct JH_server server [const restrict static 1], +   void * (*thread_main) (void *) +) +{ +   const JH_index thread_id = server->thread_params.thread_id; +   int error; + +   server->workers.threads[thread_id].state = JH_SERVER_RUNNING_THREAD; + +   error = +      pthread_create +      ( +         &(server->workers.threads[thread_id].posix_id), +         (const pthread_attr_t *) NULL, +         thread_main, +         (void *) &(server->thread_params) +      ); + +   if (error != 0) +   { +      JH_ERROR +      ( +         stderr, +         "Unable to spawn thread: %s.", +         strerror(error) +      ); + +      server->workers.threads[thread_id].state = JH_SERVER_NO_THREAD; + +      return -1; +   } + +   pthread_barrier_wait(&(server->workers.barrier)); + +   server->workers.currently_running += 1; + +   return 0; +} + +int JH_server_handle_new_connection +( +   struct JH_server server [const restrict static 1] +) +{ +   if (get_new_socket(server) < 0) +   { +      return -1; +   } + +   if (get_new_thread(server) < 0) +   { +      close(server->thread_params.socket); + +      return -2; +   } + +   if (spawn_thread(server, JH_server_worker_main) < 0) +   { +      close(server->thread_params.socket); + +      return -3; +   } + +   return 0; +} diff --git a/src/server/server_signal.c b/src/server/server_signal.c new file mode 100644 index 0000000..9361382 --- /dev/null +++ b/src/server/server_signal.c @@ -0,0 +1,41 @@ +#include <signal.h> +#include <string.h> +#include <stdio.h> + +#include "server.h" + +static volatile char JH_SERVER_IS_RUNNING = (char) 1; + +static void request_termination (int const signo) +{ +   if ((signo == SIGINT) || (signo == SIGTERM)) +   { +      JH_server_request_termination(); +   } +} + +void JH_server_request_termination (void) +{ +   JH_SERVER_IS_RUNNING = (char) 0; +} + +int JH_server_is_running (void) +{ +   return (int) JH_SERVER_IS_RUNNING; +} + +int JH_server_set_signal_handlers (void) +{ +   /* +   struct sigaction act; + +      act.sa_handler = request_termination; +      act.sa_mask = +      act.sa_flags = +      act.sa_restorer = +   */ + +   /* TODO */ + +   return -1; +} diff --git a/src/server/server_types.h b/src/server/server_types.h new file mode 100644 index 0000000..62e785b --- /dev/null +++ b/src/server/server_types.h @@ -0,0 +1,81 @@ +#ifndef _JH_SERVER_SERVER_TYPES_H_ +#define _JH_SERVER_SERVER_TYPES_H_ + +#include <sys/time.h> +#include <stdio.h> + +#ifndef JH_RUNNING_FRAMA_C +   #include <pthread.h> +#endif + +#include "../core/index_types.h" + +#include "../parameters/parameters_types.h" + +#include "../error/error.h" + +#include "../filter/filter_types.h" + +#define JH_SERVER_MAX_RETRIES 10 +#define JH_SERVER_BUFFER_SIZE 0 + +#define JH_SERVER_SOCKET_ACCEPT_TIMEOUT_SEC 5 +#define JH_SERVER_SOCKET_LISTEN_BACKLOG     5 + +#define JH_SERVER_WORKER_MAX_WAITING_TIME 5 + +enum JH_server_thread_state +{ +   JH_SERVER_JOINING_THREAD, +   JH_SERVER_RUNNING_THREAD, +   JH_SERVER_NO_THREAD +}; + +struct JH_server_thread_data +{ +#ifndef JH_RUNNING_FRAMA_C +   pthread_t posix_id; +#endif +   enum JH_server_thread_state state; +}; + +struct JH_server_thread_collection +{ +   struct JH_server_thread_data * threads; +   JH_index threads_capacity; /* protected by merger_mutex */ +#ifndef JH_RUNNING_FRAMA_C +   pthread_mutex_t mutex; +   pthread_barrier_t barrier; +#endif +   JH_index currently_running; +}; + +struct JH_server_socket +{ +   int file_descriptor; +   fd_set as_a_set; +   struct timeval timeout; +}; + +struct JH_server_thread_parameters +{ +   struct JH_server_thread_collection * thread_collection; +   const struct JH_parameters * server_params; +   JH_index thread_id; +   int socket; +}; + +struct JH_server_worker +{ +   struct JH_server_thread_parameters params; +   int downstream_socket; +}; + +struct JH_server +{ +   struct JH_server_thread_collection workers; +   struct JH_server_socket socket; +   struct JH_server_thread_parameters thread_params; +}; + +#endif diff --git a/src/server/server_wait_for_event.c b/src/server/server_wait_for_event.c new file mode 100644 index 0000000..c949438 --- /dev/null +++ b/src/server/server_wait_for_event.c @@ -0,0 +1,62 @@ +#include <sys/select.h> + +#include <errno.h> +#include <stdio.h> +#include <string.h> + +#include "../error/error.h" + +#include "server.h" + +int JH_server_wait_for_event +( +   struct JH_server server [const static 1] +) +{ +   int ready_fds; +   const int old_errno = errno; +   fd_set ready_to_read; + +   ready_to_read = server->socket.as_a_set; + +   /* call to select may alter timeout */ +   memset((void *) &(server->socket.timeout), 0, sizeof(struct timeval)); + +   server->socket.timeout.tv_sec = JH_SERVER_SOCKET_ACCEPT_TIMEOUT_SEC; + +   errno = 0; + +   ready_fds = select +   ( +      (server->socket.file_descriptor + 1), +      &ready_to_read, +      (fd_set *) NULL, +      (fd_set *) NULL, +      &(server->socket.timeout) +   ); + +   JH_DEBUG(stderr, 1, "SELECT returned: %i, errno is %i.", ready_fds, errno); + +   if (errno == EINTR) +   { +      ready_fds = 0; +   } + +   if (ready_fds == -1) +   { +      JH_FATAL +      ( +         stderr, +         "Unable to wait on server socket: %s.", +         strerror(errno) +      ); + +      errno = old_errno; + +      return -1; +   } + +   errno = old_errno; + +   return ready_fds; +} diff --git a/src/server/server_worker.c b/src/server/server_worker.c new file mode 100644 index 0000000..0542b1f --- /dev/null +++ b/src/server/server_worker.c @@ -0,0 +1,189 @@ +#include <sys/socket.h> +#include <sys/un.h> + +#include <stdlib.h> +#include <unistd.h> +#include <string.h> +#include <stdio.h> +#include <errno.h> + +#include "../filter/filter.h" + +#include "../parameters/parameters.h" + +#include "server.h" + +static int connect_downstream +( +   struct JH_server_worker worker [const restrict static 1] +) +{ +   struct sockaddr_un addr; + +   const int old_errno = errno; + +   errno = 0; + +   if ((worker->downstream_socket = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) +   { +      JH_FATAL +      ( +         stderr, +         "Unable to create socket: %s.", +         strerror(errno) +      ); + +      errno = old_errno; + +      return -1; +   } + +   errno = old_errno; + +   memset((void *) &addr, (int) 0, sizeof(addr)); + +   addr.sun_family = AF_UNIX; + +   strncpy +   ( +      (char *) addr.sun_path, +      JH_parameters_get_dest_socket_name(worker->params.server_params), +      (sizeof(addr.sun_path) - ((size_t) 1)) +   ); + +   errno = 0; + +   if +   ( +      connect +      ( +         worker->downstream_socket, +         (struct sockaddr *) &addr, +         sizeof(addr) +      ) +      == -1 +   ) +   { +      JH_FATAL +      ( +         stderr, +         "Unable to connect to address: %s.", +         strerror(errno) +      ); + +      errno = old_errno; + +      close(worker->downstream_socket); + +      worker->downstream_socket = -1; + +      return -1; +   } + +   errno = old_errno; + +   return 0; +} + +static int initialize +( +   struct JH_server_worker worker [const restrict static 1], +   void * input +) +{ +   memcpy +   ( +      (void *) &(worker->params), +      (const void *) input, +      sizeof(struct JH_server_thread_parameters) +   ); + +   pthread_barrier_wait(&(worker->params.thread_collection->barrier)); + +   return connect_downstream(worker); +} + +static void finalize +( +   struct JH_server_worker worker [const restrict static 1] +) +{ +   if (worker->downstream_socket != -1) +   { +      close(worker->downstream_socket); + +      worker->downstream_socket = -1; +   } + +   if (worker->params.socket != -1) +   { +      close(worker->params.socket); + +      worker->params.socket = -1; +   } + +   pthread_mutex_lock(&(worker->params.thread_collection->mutex)); + +   worker->params.thread_collection->threads[worker->params.thread_id].state = +      JH_SERVER_JOINING_THREAD; + +   pthread_mutex_unlock(&(worker->params.thread_collection->mutex)); +} + +void * JH_server_worker_main (void * input) +{ +   int status; +   int timeout_count; +   struct JH_filter filter; +   struct JH_server_worker worker; + +   initialize(&worker, input); + +   if (JH_filter_initialize(&filter) < 0) +   { +      finalize(&worker); + +      return NULL; +   } + +   timeout_count = 0; + +   while (JH_server_is_running()) +   { +      status = +         JH_filter_step +         ( +            &filter, +            worker.params.socket, +            worker.downstream_socket +         ); + +      if (status == 0) +      { +         timeout_count = 0; +      } +      else if (status == 1) +      { +         timeout_count += 1; + +         if (timeout_count == 2) +         { +            break; +         } +         else +         { +            sleep(JH_SERVER_WORKER_MAX_WAITING_TIME); +         } +      } +      else +      { +         break; +      } +   } + +   JH_filter_finalize(&filter); + +   finalize(&worker); + +   return NULL; +} | 


