| summaryrefslogtreecommitdiff |
diff options
| author | Nathanael Sensfelder <SpamShield0@MultiAgentSystems.org> | 2017-06-14 17:16:14 +0200 |
|---|---|---|
| committer | Nathanael Sensfelder <SpamShield0@MultiAgentSystems.org> | 2017-06-14 17:16:14 +0200 |
| commit | 08dca7c0f612864513354288fc676968cddfbd96 (patch) | |
| tree | 050d25cde509c708f265c235a330f20aadd692e8 | |
Initial commit
34 files changed, 2551 insertions, 0 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..b35d2f1 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,30 @@ +cmake_minimum_required(VERSION 3.0 FATAL_ERROR) + +project("JabberHive - Storage") + +include(FindPkgConfig) + +add_subdirectory(src) +add_definitions(-D_POSIX_SOURCE) +add_definitions(-D_POSIX_C_SOURCE=200809L) + +set(CMAKE_C_FLAGS $ENV{CFLAGS}) +if(CMAKE_COMPILER_IS_GNUCC) + set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -g -Wall -Wpedantic -Wconversion") + message(STATUS "GNUCC detected. Adding '-O3' parameter.") + set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -O3") +endif() +message(STATUS "CFLAGS=${CMAKE_C_FLAGS}") + +# ${SRC_FILES} is recursively defined in the subdirectories. +# Each subdirectory only adds the source files that are present at its level. +add_executable(jabberhive-storage ${SRC_FILES}) +set_property(TARGET jabberhive-storage PROPERTY C_STANDARD 99) +set_property(TARGET jabberhive-storage PROPERTY C_STANDARD_REQUIRED ON) + +find_package(Threads) +target_link_libraries(jabberhive-storage ${CMAKE_THREAD_LIBS_INIT}) +target_link_libraries(jabberhive-storage m) + +## OPTION HANDLING ############################################################# +# TODO @@ -0,0 +1,27 @@ +Copyright (c) 2017, NathanaĆ«l Sensfelder +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +* Neither the name of JabberHive nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..313e70a --- /dev/null +++ b/README.md @@ -0,0 +1,25 @@ +## What is JabberHive? +JabberHive is a modular ChatBot system. All "modules" are in fact separate +programs linked together using the JabberHive Protocol. Please refer to the +protocol for more information. + +## Component Description +* Filter for a JabberHive network. +* Stores passing STRINGs from ?RLR and ?RL requests. + +## JabberHive Protocol Compatibility +* **Protocol Version(s):** 1. +* **Inbound Connections:** Multiple. +* **Outbound Connections:** Multiple. +* **Pipelining:** No. +* **Behavior:** Filter. + +## Notes +* Does not correctly reply to Pipelining & Protocol Version requests. +* The jabberhive-cli Gateway can be used to feed a storage file to a JabberHive +network. + +## Dependencies +- POSIX compliant OS. +- C compiler (with C99 support). +- CMake. diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt new file mode 100644 index 0000000..0b242bc --- /dev/null +++ b/src/CMakeLists.txt @@ -0,0 +1,12 @@ +add_subdirectory(error) +add_subdirectory(core) +add_subdirectory(filter) +add_subdirectory(server) +add_subdirectory(parameters) + +set( + SRC_FILES ${SRC_FILES} + ${CMAKE_CURRENT_SOURCE_DIR}/main.c +) + +set(SRC_FILES ${SRC_FILES} PARENT_SCOPE) diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt new file mode 100644 index 0000000..f5ede7c --- /dev/null +++ b/src/core/CMakeLists.txt @@ -0,0 +1,7 @@ +set( + SRC_FILES ${SRC_FILES} + ${CMAKE_CURRENT_SOURCE_DIR}/index.c +) + +set(SRC_FILES ${SRC_FILES} PARENT_SCOPE) + diff --git a/src/core/index.c b/src/core/index.c new file mode 100644 index 0000000..6066e3c --- /dev/null +++ b/src/core/index.c @@ -0,0 +1,19 @@ +#include <math.h> + +#include "index_types.h" + +size_t JH_index_string_length (const JH_index i) +{ + if (i == 0) + { + return 1; + } + else if (i > 0) + { + return (size_t) (log10l((long double) i) + 1); + } + else + { + return (size_t) (log10l((long double) (-i)) + 2); + } +} diff --git a/src/core/index.h b/src/core/index.h new file mode 100644 index 0000000..52077b6 --- /dev/null +++ b/src/core/index.h @@ -0,0 +1,8 @@ +#ifndef _JH_CORE_INDEX_H_ +#define _JH_CORE_INDEX_H_ + +#include "index_types.h" + +size_t JH_index_string_length (const JH_index i); + +#endif diff --git a/src/core/index_types.h b/src/core/index_types.h new file mode 100644 index 0000000..2180815 --- /dev/null +++ b/src/core/index_types.h @@ -0,0 +1,28 @@ +#ifndef _JH_CORE_INDEX_TYPES_H_ +#define _JH_CORE_INDEX_TYPES_H_ + +#include "../pervasive.h" + +/* + * JH_index is a replacement for size_t. As many indices are stored for every + * word learned, having control over which type of variable is used to represent + * those indices lets us scale the RAM usage. + */ + +#include <limits.h> +#include <stdint.h> + +/* Must be unsigned. */ +typedef unsigned int JH_index; + +/* Must be > 0. */ +#define JH_INDEX_MAX UINT_MAX +#define JH_INDEX_TAG "%u" + +#ifndef JH_RUNNING_FRAMA_C + #if (JH_INDEX_MAX > SIZE_MAX) + #error "JH_index should not be able to go higher than a size_t variable." + #endif +#endif + +#endif diff --git a/src/error/CMakeLists.txt b/src/error/CMakeLists.txt new file mode 100644 index 0000000..fa07534 --- /dev/null +++ b/src/error/CMakeLists.txt @@ -0,0 +1,6 @@ +set( + SRC_FILES ${SRC_FILES} +) + +set(SRC_FILES ${SRC_FILES} PARENT_SCOPE) + diff --git a/src/error/error.h b/src/error/error.h new file mode 100644 index 0000000..145c838 --- /dev/null +++ b/src/error/error.h @@ -0,0 +1,143 @@ +#ifndef _JH_ERROR_ERROR_H_ +#define _JH_ERROR_ERROR_H_ + +#include <stdio.h> + +#include "../pervasive.h" + +#ifndef JH_DEBUG_PROGRAM_FLOW + #define JH_DEBUG_PROGRAM_FLOW (0 || JH_DEBUG_ALL) +#endif + +#ifndef JH_DEBUG_CONFIG + #define JH_DEBUG_CONFIG (0 || JH_DEBUG_ALL) +#endif + +#ifndef JH_DEBUG_LEARNING + #define JH_DEBUG_LEARNING (0 || JH_DEBUG_ALL) +#endif + +#ifndef JH_DEBUG_NETWORK + #define JH_DEBUG_NETWORK 1 +#endif + +#ifndef JH_DEBUG_NETWORK + #define JH_DEBUG_NETWORK (0 || JH_DEBUG_ALL) +#endif + +#define JH_ENABLE_WARNINGS_OUTPUT 1 +#define JH_ENABLE_RUNTIME_ERRORS_OUTPUT 1 +#define JH_ENABLE_PROGRAMMING_ERRORS_OUTPUT 1 +#define JH_ENABLE_FATAL_ERROR_OUTPUT 1 + +#ifdef JH_ENABLE_ERROR_LOCATION + #define JH_LOCATION " [" __FILE__ "][" JH_TO_STRING(__LINE__) "]" +#else + #define JH_LOCATION "" +#endif + +#define JH_PRINT_STDERR(io, symbol, str, ...)\ + fprintf(io, "[" symbol "]" JH_LOCATION " " str "\n", __VA_ARGS__); + +/* + * Given that we use preprocessor contants as flags, we can expect the compilers + * to remove the test condition for disabled flags. No need to be shy about + * allowing many debug options. + */ + +#define JH_DEBUG(io, flag, str, ...)\ + JH_ISOLATE\ + (\ + if (flag)\ + {\ + JH_PRINT_STDERR(io, "D", str, __VA_ARGS__);\ + }\ + ) + + +#define JH_WARNING(io, str, ...)\ + JH_ISOLATE\ + (\ + if (JH_ENABLE_WARNINGS_OUTPUT)\ + {\ + JH_PRINT_STDERR(io, "W", str, __VA_ARGS__);\ + }\ + ) + +#define JH_ERROR(io, str, ...)\ + JH_ISOLATE\ + (\ + if (JH_ENABLE_RUNTIME_ERRORS_OUTPUT)\ + {\ + JH_PRINT_STDERR(io, "E", str, __VA_ARGS__);\ + }\ + ) + +#define JH_PROG_ERROR(io, str, ...)\ + JH_ISOLATE\ + (\ + if (JH_ENABLE_PROGRAMMING_ERRORS_OUTPUT)\ + {\ + JH_PRINT_STDERR(io, "P", str, __VA_ARGS__);\ + }\ + ) + +#define JH_FATAL(io, str, ...)\ + JH_ISOLATE\ + (\ + if (JH_ENABLE_FATAL_ERROR_OUTPUT)\ + {\ + JH_PRINT_STDERR(io, "F", str, __VA_ARGS__);\ + }\ + ) + +/* For outputs without dynamic content (static). ******************************/ + +#define JH_PRINT_S_STDERR(io, symbol, str)\ + fprintf(io, "[" symbol "]" JH_LOCATION " " str "\n"); + +#define JH_S_DEBUG(io, flag, str)\ + JH_ISOLATE\ + (\ + if (flag)\ + {\ + JH_PRINT_S_STDERR(io, "D", str);\ + }\ + ) + +#define JH_S_WARNING(io, str)\ + JH_ISOLATE\ + (\ + if (JH_ENABLE_WARNINGS_OUTPUT)\ + {\ + JH_PRINT_S_STDERR(io, "W", str);\ + }\ + ) + +#define JH_S_ERROR(io, str)\ + JH_ISOLATE\ + (\ + if (JH_ENABLE_RUNTIME_ERRORS_OUTPUT)\ + {\ + JH_PRINT_S_STDERR(io, "E", str);\ + }\ + ) + +#define JH_S_PROG_ERROR(io, str)\ + JH_ISOLATE\ + (\ + if (JH_ENABLE_PROGRAMMING_ERRORS_OUTPUT)\ + {\ + JH_PRINT_S_STDERR(io, "P", str);\ + }\ + ) + +#define JH_S_FATAL(io, str)\ + JH_ISOLATE\ + (\ + if (JH_ENABLE_FATAL_ERROR_OUTPUT)\ + {\ + JH_PRINT_S_STDERR(io, "F", str);\ + }\ + ) +#endif diff --git a/src/filter/CMakeLists.txt b/src/filter/CMakeLists.txt new file mode 100644 index 0000000..c8add44 --- /dev/null +++ b/src/filter/CMakeLists.txt @@ -0,0 +1,6 @@ +set( + SRC_FILES ${SRC_FILES} + ${CMAKE_CURRENT_SOURCE_DIR}/filter.c +) + +set(SRC_FILES ${SRC_FILES} PARENT_SCOPE) diff --git a/src/filter/filter.c b/src/filter/filter.c new file mode 100644 index 0000000..f5f89c7 --- /dev/null +++ b/src/filter/filter.c @@ -0,0 +1,368 @@ +#include <stdlib.h> +#include <errno.h> +#include <string.h> +#include <unistd.h> + +#include "../pervasive.h" +#include "../error/error.h" +#include "../parameters/parameters.h" + +#include "filter.h" + +static int send_downstream +( + struct JH_filter filter [const restrict static 1], + const int upstream_socket, + const int downstream_socket, + FILE storage_file [const restrict static 1] +) +{ + char c; + ssize_t io_bytes; + const int old_errno = errno; + + for (;;) + { + errno = 0; + + io_bytes = + read + ( + upstream_socket, + (void *) &c, + sizeof(char) + ); + + if (io_bytes == -1) + { + JH_ERROR + ( + stderr, + "Upstream read error %d (\"%s\").", + errno, + strerror(errno) + ); + + errno = old_errno; + + return -1; + } + else if (io_bytes == 0) + { + return 1; + } + + switch (filter->buffer_index) + { + case 0: + if (c == '?') + { + filter->buffer_index = 1; + } + else + { + filter->buffer_index = -1; + } + break; + + case 1: + if (c == 'R') + { + filter->buffer_index = 2; + } + else + { + filter->buffer_index = -1; + } + break; + + case 2: + if (c == 'L') + { + filter->buffer_index = 3; + } + else + { + filter->buffer_index = -1; + } + break; + + case 3: + if (c == ' ') + { + filter->buffer_index = JH_FILTER_BUFFER_SIZE; + } + else if (c == 'R') + { + filter->buffer_index = 4; + } + else + { + filter->buffer_index = -1; + } + break; + + case 4: + if (c == ' ') + { + filter->buffer_index = JH_FILTER_BUFFER_SIZE; + } + else + { + filter->buffer_index = -1; + } + break; + + case JH_FILTER_BUFFER_SIZE: + if (putc(c, storage_file) != c) + { + /* TODO */ + JH_S_WARNING(stderr, "Writing to temp file failed."); + } + + if (c == '\n') + { + fflush(storage_file); + } + + break; + + default: + break; + } + + errno = 0; + + io_bytes = write + ( + downstream_socket, + (void *) &c, + sizeof(char) + ); + + if (io_bytes == -1) + { + JH_ERROR + ( + stderr, + "Upstream write error %d (\"%s\").", + errno, + strerror(errno) + ); + + errno = old_errno; + + return -1; + } + + errno = old_errno; + + if (c == '\n') + { + filter->buffer_index = 0; + filter->state = JH_FILTER_IS_SENDING_UPSTREAM; + + return 0; + } + } +} + +static int send_upstream +( + struct JH_filter filter [const restrict static 1], + const int upstream_socket, + const int downstream_socket +) +{ + char c; + ssize_t io_bytes; + const int old_errno = errno; + + for (;;) + { + errno = 0; + + io_bytes = + read + ( + downstream_socket, + (void *) &c, + sizeof(char) + ); + + if (io_bytes == -1) + { + JH_ERROR + ( + stderr, + "Downstream read error %d (\"%s\").", + errno, + strerror(errno) + ); + + errno = old_errno; + + return -1; + } + else if (io_bytes == 0) + { + return 1; + } + + errno = 0; + + io_bytes = write + ( + upstream_socket, + (void *) &c, + sizeof(char) + ); + + if (io_bytes == -1) + { + JH_ERROR + ( + stderr, + "Upstream write error %d (\"%s\").", + errno, + strerror(errno) + ); + + errno = old_errno; + + return -1; + } + + errno = old_errno; + + switch (filter->buffer_index) + { + case -1: + if (c == '\n') + { + filter->buffer_index = 0; + } + break; + + case 0: + if (c == '!') + { + filter->buffer_index = 1; + } + else + { + filter->buffer_index = -1; + } + break; + + case 1: + if ((c == 'N') || (c == 'P')) + { + filter->buffer_index = 2; + } + else + { + filter->buffer_index = -1; + } + break; + + case 2: + if (c == ' ') + { + filter->buffer_index = 3; + } + else + { + filter->buffer_index = -1; + } + break; + + case 3: + if (c == '\n') + { + filter->buffer_index = 0; + filter->state = JH_FILTER_IS_SENDING_DOWNSTREAM; + + return 0; + } + else + { + filter->buffer_index = -1; + } + break; + + default: + JH_PROG_ERROR + ( + stderr, + "Invalid value for 'filter->buffer_index': %d.", + filter->buffer_index + ); + + filter->buffer_index = 0; + + return -1; + } + } + + return -1; +} + +/******************************************************************************/ +/** EXPORTED ******************************************************************/ +/******************************************************************************/ + +int JH_filter_step +( + struct JH_filter filter [const restrict static 1], + const int upstream_socket, + const int downstream_socket, + FILE storage_file [const restrict static 1] +) +{ + switch (filter->state) + { + case JH_FILTER_IS_SENDING_DOWNSTREAM: + JH_DEBUG(stderr, 1, "<SENDING_DOWN> (index: %d)", filter->buffer_index); + return + send_downstream + ( + filter, + upstream_socket, + downstream_socket, + storage_file + ); + + case JH_FILTER_IS_SENDING_UPSTREAM: + JH_DEBUG(stderr, 1, "<SENDING_UP> (index: %d)", filter->buffer_index); + return + send_upstream + ( + filter, + upstream_socket, + downstream_socket + ); + + default: + return -1; + } +} + +int JH_filter_initialize +( + struct JH_filter filter [const restrict static 1] +) +{ + filter->state = JH_FILTER_IS_SENDING_DOWNSTREAM; + filter->buffer_index = 0; + + return 0; +} + +void JH_filter_finalize +( + struct JH_filter filter [const restrict static 1] +) +{ + /* Nothing to do */ +} diff --git a/src/filter/filter.h b/src/filter/filter.h new file mode 100644 index 0000000..ccad593 --- /dev/null +++ b/src/filter/filter.h @@ -0,0 +1,27 @@ +#ifndef _JH_FILTER_H_ +#define _JH_FILTER_H_ + +#include "../parameters/parameters_types.h" +#include "../server/server_types.h" + +#include "filter_types.h" + +int JH_filter_initialize +( + struct JH_filter filter [const restrict static 1] +); + +int JH_filter_step +( + struct JH_filter filter [const restrict static 1], + const int upstream_socket, + const int downstream_socket, + FILE storage_file [const restrict static 1] +); + +void JH_filter_finalize +( + struct JH_filter filter [const restrict static 1] +); + +#endif diff --git a/src/filter/filter_types.h b/src/filter/filter_types.h new file mode 100644 index 0000000..3b9c28e --- /dev/null +++ b/src/filter/filter_types.h @@ -0,0 +1,20 @@ +#ifndef _JH_FILTER_TYPES_H_ +#define _JH_FILTER_TYPES_H_ + +#include <stdio.h> + +#define JH_FILTER_BUFFER_SIZE 5 + +enum JH_filter_state +{ + JH_FILTER_IS_SENDING_DOWNSTREAM, + JH_FILTER_IS_SENDING_UPSTREAM +}; + +struct JH_filter +{ + enum JH_filter_state state; + int buffer_index; +}; + +#endif diff --git a/src/main.c b/src/main.c new file mode 100644 index 0000000..9216104 --- /dev/null +++ b/src/main.c @@ -0,0 +1,54 @@ +#include <sys/socket.h> +#include <sys/un.h> + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <errno.h> +#include <unistd.h> + +#include "error/error.h" +#include "parameters/parameters.h" +#include "server/server.h" + +#include "pervasive.h" + +static void print_help (const char runnable [const restrict static 1]) +{ + printf + ( + "JabberHive - Limiter\n" + "Software Version %d\n" + "Protocol Version %d\n" + "\nUsages:\n" + " JH GATEWAY:\t%s SOCKET_NAME DESTINATION REPLY_RATE\n" + " SHOW HELP:\tAnything else.\n" + "\nParameters:\n" + " SOCKET_NAME:\tValid UNIX socket.\n" + " DESTINATION:\tValid UNIX socket.\n" + " REPLY_RATE:\tInteger [0,100].\n", + JH_PROGRAM_VERSION, + JH_PROTOCOL_VERSION, + runnable + ); +} + + +int main (int const argc, const char * argv [const static argc]) +{ + struct JH_parameters params; + + if (JH_parameters_initialize(¶ms, argc, argv) < 0) + { + print_help(argv[0]); + + return -1; + } + + if (JH_server_main(¶ms) < 0) + { + return -1; + } + + return 0; +} diff --git a/src/parameters/CMakeLists.txt b/src/parameters/CMakeLists.txt new file mode 100644 index 0000000..2aa7ece --- /dev/null +++ b/src/parameters/CMakeLists.txt @@ -0,0 +1,7 @@ +set( + SRC_FILES ${SRC_FILES} + ${CMAKE_CURRENT_SOURCE_DIR}/parameters.c + ${CMAKE_CURRENT_SOURCE_DIR}/parameters_getters.c +) +set(SRC_FILES ${SRC_FILES} PARENT_SCOPE) + diff --git a/src/parameters/parameters.c b/src/parameters/parameters.c new file mode 100644 index 0000000..80cb01b --- /dev/null +++ b/src/parameters/parameters.c @@ -0,0 +1,100 @@ +#include <stdio.h> +#include <stdlib.h> +#include <errno.h> +#include <string.h> +#include <limits.h> + +#include "../core/index.h" +#include "../error/error.h" + +#include "parameters.h" + +static void set_default_to_all_fields +( + struct JH_parameters param [const restrict static 1] +) +{ + param->main_storage_filename = JH_PARAMETERS_DEFAULT_MAIN_STORAGE_FILENAME; + param->temp_storage_prefix = JH_PARAMETERS_DEFAULT_TEMP_STORAGE_PREFIX; + param->temp_storage_prefix_length = strlen(param->temp_storage_prefix); + param->socket_name = (const char *) NULL; + param->dest_socket_name = (const char *) NULL; +} + +static int is_valid +( + struct JH_parameters param [const restrict static 1] +) +{ + int valid; + + valid = 1; + + if (param->socket_name == (const char *) NULL) + { + JH_S_FATAL(stderr, "Missing parameter: This entity's socket name."); + + valid = 0; + } + + if (param->dest_socket_name == (const char *) NULL) + { + JH_S_FATAL(stderr, "Missing parameter: The destination's socket name."); + + valid = 0; + } + + if + ( + (SIZE_MAX - JH_index_string_length(JH_INDEX_MAX)) + <= param->temp_storage_prefix_length + ) + { + JH_S_FATAL(stderr, "Temporary storage prefix is too long."); + + valid = 0; + } + + return valid; +} + +static void set_parameters +( + struct JH_parameters param [const restrict static 1], + int const argc, + const char * argv [const static argc] +) +{ + if (argc < 2) + { + return; + } + + param->socket_name = argv[1]; + + if (argc < 3) + { + return; + } + + param->dest_socket_name = argv[2]; +} + +int JH_parameters_initialize +( + struct JH_parameters param [const restrict static 1], + int const argc, + const char * argv [const static argc] +) +{ + set_default_to_all_fields(param); + + set_parameters(param, argc, argv); + + if (!is_valid(param)) + { + return -1; + } + + return 0; +} diff --git a/src/parameters/parameters.h b/src/parameters/parameters.h new file mode 100644 index 0000000..d8382a2 --- /dev/null +++ b/src/parameters/parameters.h @@ -0,0 +1,40 @@ +#ifndef _JH_CLI_PARAMETERS_H_ +#define _JH_CLI_PARAMETERS_H_ + +#include <stdlib.h> + +#include "parameters_types.h" + +int JH_parameters_initialize +( + struct JH_parameters param [const restrict static 1], + int const argc, + const char * argv [const static argc] +); + +const char * JH_parameters_get_main_storage_filename +( + const struct JH_parameters param [const restrict static 1] +); + +const char * JH_parameters_get_temp_storage_prefix +( + const struct JH_parameters param [const restrict static 1] +); + +size_t JH_parameters_get_temp_storage_prefix_length +( + const struct JH_parameters param [const restrict static 1] +); + +const char * JH_parameters_get_socket_name +( + const struct JH_parameters param [const restrict static 1] +); + +const char * JH_parameters_get_dest_socket_name +( + const struct JH_parameters param [const restrict static 1] +); + +#endif diff --git a/src/parameters/parameters_getters.c b/src/parameters/parameters_getters.c new file mode 100644 index 0000000..75bf1fb --- /dev/null +++ b/src/parameters/parameters_getters.c @@ -0,0 +1,43 @@ +#include <stdlib.h> + +#include "parameters.h" + +const char * JH_parameters_get_main_storage_filename +( + const struct JH_parameters param [const restrict static 1] +) +{ + return param->main_storage_filename; +} + +const char * JH_parameters_get_temp_storage_prefix +( + const struct JH_parameters param [const restrict static 1] +) +{ + return param->temp_storage_prefix; +} + +size_t JH_parameters_get_temp_storage_prefix_length +( + const struct JH_parameters param [const restrict static 1] +) +{ + return param->temp_storage_prefix_length; +} + +const char * JH_parameters_get_socket_name +( + const struct JH_parameters param [const restrict static 1] +) +{ + return param->socket_name; +} + +const char * JH_parameters_get_dest_socket_name +( + const struct JH_parameters param [const restrict static 1] +) +{ + return param->dest_socket_name; +} diff --git a/src/parameters/parameters_types.h b/src/parameters/parameters_types.h new file mode 100644 index 0000000..c21d14b --- /dev/null +++ b/src/parameters/parameters_types.h @@ -0,0 +1,22 @@ +#ifndef _JH_CLI_PARAMETERS_TYPES_H_ +#define _JH_CLI_PARAMETERS_TYPES_H_ + +#include <stdlib.h> + +#define JH_PARAMETERS_COUNT 3 + +#define JH_PARAMETERS_DEFAULT_MAIN_STORAGE_FILENAME "storage.txt" +#define JH_PARAMETERS_DEFAULT_TEMP_STORAGE_PREFIX "/tmp/jabberhive-storage/storage_thread_" + +struct JH_parameters +{ + const char * restrict main_storage_filename; + const char * restrict temp_storage_prefix; + size_t temp_storage_prefix_length; + + /* JH **********************************************************************/ + const char * restrict socket_name; + const char * restrict dest_socket_name; +}; + +#endif diff --git a/src/pervasive.h b/src/pervasive.h new file mode 100644 index 0000000..27d832d --- /dev/null +++ b/src/pervasive.h @@ -0,0 +1,28 @@ +#ifndef _JH_PERVASIVE_H_ +#define _JH_PERVASIVE_H_ + +#include <string.h> + +#define JH_PROGRAM_VERSION 1 +#define JH_PROTOCOL_VERSION 1 + +#ifdef __FRAMA_C__ + #define JH_RUNNING_FRAMA_C 1 +#endif + +#define JH_DEBUG_ALL 1 + +#ifndef JH_DEBUG_ALL + #define JH_DEBUG_ALL 0 +#endif + +#define JH__TO_STRING(x) #x +#define JH_TO_STRING(x) JH__TO_STRING(x) +#define JH_ISOLATE(a) do {a} while (0) + +/* strncmp stops at '\0' and strlen does not count '\0'. */ +#define JH_IS_PREFIX(a, b) (strncmp(a, b, strlen(a)) == 0) + +#define JH_STRING_EQUALS(a, b) (strcmp(a, b) == 0) + +#endif diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt new file mode 100644 index 0000000..625749b --- /dev/null +++ b/src/server/CMakeLists.txt @@ -0,0 +1,16 @@ +set( + SRC_FILES ${SRC_FILES} + ${CMAKE_CURRENT_SOURCE_DIR}/server.c + ${CMAKE_CURRENT_SOURCE_DIR}/server_create_socket.c + ${CMAKE_CURRENT_SOURCE_DIR}/server_finalize.c + ${CMAKE_CURRENT_SOURCE_DIR}/server_initialize.c + ${CMAKE_CURRENT_SOURCE_DIR}/server_joining_threads.c + ${CMAKE_CURRENT_SOURCE_DIR}/server_new_connection.c + ${CMAKE_CURRENT_SOURCE_DIR}/server_signal.c + ${CMAKE_CURRENT_SOURCE_DIR}/server_wait_for_event.c + ${CMAKE_CURRENT_SOURCE_DIR}/server_worker.c + ${CMAKE_CURRENT_SOURCE_DIR}/server_worker_data_merger.c +) + +set(SRC_FILES ${SRC_FILES} PARENT_SCOPE) + diff --git a/src/server/server.c b/src/server/server.c new file mode 100644 index 0000000..640584f --- /dev/null +++ b/src/server/server.c @@ -0,0 +1,101 @@ +#include <signal.h> +#include <string.h> +#include <stdio.h> + +#include "../parameters/parameters.h" + +#include "server.h" + +int JH_server_main +( + const struct JH_parameters params [const restrict static 1] +) +{ + struct JH_server server; + JH_index retries; + + retries = 0; + /* TODO + if (JH_server_set_signal_handlers < 0) + { + return -1; + } + */ + + if (JH_server_initialize(&server, params) < 0) + { + return -1; + } + + while (JH_server_is_running()) + { + switch (JH_server_wait_for_event(&server)) + { + case 0: /* Timed out or signal'd. */ + JH_S_DEBUG(stderr, 1, "Timed out..."); + JH_server_handle_joining_threads(&server); + + retries = 0; + + break; + + case 1: /* New client attempted connection. */ + JH_S_DEBUG(stderr, 1, "New connection."); + JH_server_handle_joining_threads(&server); + (void) JH_server_handle_new_connection(&server); + + retries = 0; + + break; + + case -1: /* Something bad happened. */ + retries += 1; + + if (retries == JH_SERVER_MAX_RETRIES) + { + JH_server_finalize(&server); + + return -1; + } + + break; + + default: + JH_S_PROG_ERROR + ( + stderr, + "Unexpected wait_for_event return value." + ); + + break; + } + } + + /* Waiting for the threads to join... */ + while (server.workers.currently_running > 0) + { + switch (JH_server_wait_for_event(&server)) + { + case 0: /* Timed out. */ + case 1: /* New client attempted connection. */ + JH_server_handle_joining_threads(&server); + break; + + case -1: /* Something bad happened. */ + retries += 1; + + if (retries == JH_SERVER_MAX_RETRIES) + { + JH_server_finalize(&server); + + return -1; + } + break; + } + } + + JH_server_finalize(&server); + + return 0; +} + diff --git a/src/server/server.h b/src/server/server.h new file mode 100644 index 0000000..8f13662 --- /dev/null +++ b/src/server/server.h @@ -0,0 +1,98 @@ +#ifndef _JH_SERVER_SERVER_H_ +#define _JH_SERVER_SERVER_H_ + +#include "../parameters/parameters_types.h" + +#include "server_types.h" + +int JH_server_initialize +( + struct JH_server server [const restrict static 1], + const struct JH_parameters params [const restrict static 1] +); + +int JH_server_socket_open +( + struct JH_server_socket server_socket [const restrict static 1], + const char socket_name [const restrict static 1] +); + +void JH_server_request_termination (void); +int JH_server_is_running (void); +int JH_server_set_signal_handlers (void); + +int JH_server_main +( + const struct JH_parameters params [const restrict static 1] +); + +void JH_server_finalize (struct JH_server [const restrict static 1]); + +int JH_server_wait_for_event +( + struct JH_server server [const restrict static 1] +); + +void JH_server_handle_joining_threads +( + struct JH_server server [const restrict static 1] +); + +int JH_server_handle_new_connection +( + struct JH_server server [const restrict static 1] +); + +int JH_server_worker_data_merger_thread_init +( + struct JH_server server [const restrict static 1] +); + +void * JH_server_worker_main (void * input); +void * JH_server_worker_data_merger_main (void * input); + +/* Requires ownership of worker->params.thread_collection->mutex */ +FILE * JH_server_worker_open_storage_file +( + struct JH_server_thread_collection collection [const restrict static 1], + const struct JH_parameters params [const restrict static 1], + const JH_index thread_id, + const char mode [restrict static 1] +); + +int JH_server_worker_receive +( + struct JH_server_worker worker [const restrict static 1] +); + +int JH_server_worker_handle_request +( + struct JH_server_worker worker [const restrict static 1] +); + +int JH_server_worker_send_confirm_pipelining_support +( + struct JH_server_worker worker [const restrict static 1] +); + +int JH_server_worker_send_confirm_protocol_version +( + struct JH_server_worker worker [const restrict static 1] +); + +int JH_server_worker_send_positive +( + struct JH_server_worker worker [const restrict static 1] +); + +int JH_server_worker_send_negative +( + struct JH_server_worker worker [const restrict static 1] +); + +int JH_server_worker_send_generated_reply +( + struct JH_server_worker worker [const restrict static 1] +); + +#endif diff --git a/src/server/server_create_socket.c b/src/server/server_create_socket.c new file mode 100644 index 0000000..5e0c00b --- /dev/null +++ b/src/server/server_create_socket.c @@ -0,0 +1,195 @@ +#include <errno.h> +#include <fcntl.h> +#include <stdio.h> +#include <string.h> +#include <unistd.h> + +#include <sys/socket.h> +#include <sys/un.h> + +#include "../error/error.h" + +#include "server.h" + +static int create_socket (int result [const restrict static 1]) +{ + const int old_errno = errno; + + errno = 0; + *result = socket(AF_UNIX, SOCK_STREAM, 0); + + if (*result == -1) + { + JH_FATAL + ( + stderr, + "Unable to create server socket: %s.", + strerror(errno) + ); + + errno = old_errno; + + return -1; + } + + errno = old_errno; + + return 0; +} + +static int bind_socket +( + const int socket, + const char socket_name [const restrict static 1] +) +{ + struct sockaddr_un addr; + const int old_errno = errno; + + errno = 0; + memset(&addr, 0, sizeof(struct sockaddr_un)); + + addr.sun_family = AF_UNIX; + + strncpy + ( + (void *) addr.sun_path, + (const void *) socket_name, + (sizeof(addr.sun_path) - 1) + ); + + errno = old_errno; + + if + ( + bind + ( + socket, + (const struct sockaddr *) &addr, + (socklen_t) sizeof(struct sockaddr_un) + ) != 0 + ) + { + JH_FATAL + ( + stderr, + "Unable to bind server socket to %s: %s.", + socket_name, + strerror(errno) + ); + + errno = old_errno; + + return -1; + } + + errno = old_errno; + + return 0; +} + +static int set_socket_to_unblocking (const int socket) +{ + int current_flags; + const int old_errno = errno; + + current_flags = fcntl(socket, F_GETFD); + + if (current_flags == -1) + { + JH_FATAL + ( + stderr, + "Unable to get server socket properties: %s.", + strerror(errno) + ); + + errno = old_errno; + + return -1; + } + + /* current_flags = current_flags & (~O_NONBLOCK); */ + + current_flags = fcntl(socket, F_SETFD, (current_flags | O_NONBLOCK)); + + if (current_flags == -1) + { + JH_FATAL + ( + stderr, + "Unable to set server socket properties: %s.", + strerror(errno) + ); + + errno = old_errno; + + return -2; + } + + errno = old_errno; + + return 0; +} + +static int set_socket_as_listener (const int socket) +{ + const int old_errno = errno; + + if (listen(socket, JH_SERVER_SOCKET_LISTEN_BACKLOG) != 0) + { + JH_FATAL + ( + stderr, + "Unable to set server socket properties: %s.", + strerror(errno) + ); + + errno = old_errno; + + return -1; + } + + errno = old_errno; + + return 0; +} + +int JH_server_socket_open +( + struct JH_server_socket server_socket [const restrict static 1], + const char socket_name [const restrict static 1] +) +{ + printf("\"%s\"\n", socket_name); + if (create_socket(&(server_socket->file_descriptor)) < 0) + { + return -1; + } + + if (bind_socket(server_socket->file_descriptor, socket_name) < 0) + { + close(server_socket->file_descriptor); + + return -1; + } + + if (set_socket_to_unblocking(server_socket->file_descriptor) < 0) + { + close(server_socket->file_descriptor); + + return -1; + } + + if (set_socket_as_listener(server_socket->file_descriptor) < 0) + { + close(server_socket->file_descriptor); + + return -1; + } + + FD_ZERO(&(server_socket->as_a_set)); + FD_SET(server_socket->file_descriptor, &(server_socket->as_a_set)); + + return 0; +} diff --git a/src/server/server_finalize.c b/src/server/server_finalize.c new file mode 100644 index 0000000..81d1021 --- /dev/null +++ b/src/server/server_finalize.c @@ -0,0 +1,45 @@ +#include <stdlib.h> +#include <unistd.h> + +#include "../parameters/parameters.h" + +#include "server.h" + +static void finalize_thread_collection +( + struct JH_server_thread_collection workers [const restrict static 1] +) +{ + free((void *) workers->threads); + free((void *) workers->storage_filename); + + workers->threads_capacity = 0; + + pthread_mutex_destroy(&(workers->mutex)); + pthread_barrier_destroy(&(workers->barrier)); + pthread_mutex_destroy(&(workers->merger_mutex)); + pthread_cond_destroy(&(workers->merger_condition)); + + workers->currently_running = 0; +} + +static void finalize_socket +( + struct JH_server_socket socket [const restrict static 1] +) +{ + FD_ZERO(&(socket->as_a_set)); + + close(socket->file_descriptor); + + socket->file_descriptor = -1; +} + +void JH_server_finalize +( + struct JH_server server [const restrict static 1] +) +{ + finalize_thread_collection(&(server->workers)); + finalize_socket(&(server->socket)); +} diff --git a/src/server/server_initialize.c b/src/server/server_initialize.c new file mode 100644 index 0000000..93a3b02 --- /dev/null +++ b/src/server/server_initialize.c @@ -0,0 +1,182 @@ +#include <signal.h> +#include <string.h> +#include <stdio.h> +#include <stdlib.h> + +#include "../core/index.h" +#include "../parameters/parameters.h" + +#include "server.h" + +static int initialize_worker_collection +( + struct JH_server_thread_collection c [const restrict static 1], + const struct JH_parameters params [const restrict static 1] +) +{ + int error; + + c->threads = (struct JH_server_thread_data *) NULL; + c->threads_capacity = 0; + c->currently_running = 0; + + error = + pthread_mutex_init + ( + &(c->mutex), + (const pthread_mutexattr_t *) NULL + ); + + if (error != 0) + { + JH_FATAL + ( + stderr, + "Unable to initialize worker collection's mutex (error: %d): %s.", + error, + strerror(error) + ); + + return -1; + } + + error = + pthread_barrier_init + ( + &(c->barrier), + (const pthread_barrierattr_t *) NULL, + 2 + ); + + if (error != 0) + { + pthread_mutex_destroy(&(c->mutex)); + + JH_FATAL + ( + stderr, + "Unable to initialize worker collection's barrier (error: %d): %s.", + error, + strerror(error) + ); + + return -1; + } + + error = + pthread_mutex_init + ( + &(c->merger_mutex), + (const pthread_mutexattr_t *) NULL + ); + + if (error != 0) + { + pthread_mutex_destroy(&(c->mutex)); + pthread_barrier_destroy(&(c->barrier)); + + JH_FATAL + ( + stderr, + "Unable to initialize worker data merger mutex (error: %d): %s.", + error, + strerror(error) + ); + + return -1; + } + + error = + pthread_cond_init + ( + &(c->merger_condition), + (const pthread_condattr_t *) NULL + ); + + if (error != 0) + { + pthread_mutex_destroy(&(c->mutex)); + pthread_barrier_destroy(&(c->barrier)); + pthread_mutex_destroy(&(c->merger_mutex)); + + JH_FATAL + ( + stderr, + "Unable to initialize worker data merger condition (error: %d): %s.", + error, + strerror(error) + ); + + return -1; + } + + c->storage_filename = + (char *) calloc + ( + ( + JH_parameters_get_temp_storage_prefix_length(params) + + ((size_t) JH_index_string_length(JH_INDEX_MAX)) + ), + sizeof(char) + ); + + if (c->storage_filename == (char *) NULL) + { + pthread_mutex_destroy(&(c->mutex)); + pthread_barrier_destroy(&(c->barrier)); + pthread_mutex_destroy(&(c->merger_mutex)); + pthread_cond_destroy(&(c->merger_condition)); + + JH_FATAL + ( + stderr, + "Unable to allocate memory to store worker temp storage filename " + "(error: %d): %s.", + error, + strerror(error) + ); + + return -1; + } + + return 0; +} + +void initialize_thread_parameters +( + struct JH_server server [const restrict static 1], + const struct JH_parameters params [const restrict static 1] +) +{ + server->thread_params.thread_collection = &(server->workers); + server->thread_params.server_params = params; + server->thread_params.socket = -1; +} + +int JH_server_initialize +( + struct JH_server server [const restrict static 1], + const struct JH_parameters params [const restrict static 1] +) +{ + if (initialize_worker_collection(&(server->workers), params) < 0) + { + return -1; + } + + if + ( + JH_server_socket_open + ( + &(server->socket), + JH_parameters_get_socket_name(params) + ) < 0 + ) + { + return -2; + } + + initialize_thread_parameters(server, params); + + return JH_server_worker_data_merger_thread_init(server); +} diff --git a/src/server/server_joining_threads.c b/src/server/server_joining_threads.c new file mode 100644 index 0000000..e1d92ca --- /dev/null +++ b/src/server/server_joining_threads.c @@ -0,0 +1,38 @@ +#include <sys/socket.h> + +#include <errno.h> +#include <string.h> +#include <stdio.h> +#include <stdlib.h> +#include <stdint.h> +#include <unistd.h> + +#include "../parameters/parameters.h" + +#include "server.h" + +void JH_server_handle_joining_threads +( + struct JH_server server [const restrict static 1] +) +{ + JH_index i; + + pthread_mutex_lock(&(server->workers.mutex)); + + for (i = 0; i < server->workers.threads_capacity; ++i) + { + if (server->workers.threads[i].state == JH_SERVER_JOINING_THREAD) + { + JH_DEBUG(stderr, 1, "Joining thread %u", i); + + pthread_join(server->workers.threads[i].posix_id, (void **) NULL); + + server->workers.threads[i].state = JH_SERVER_NO_THREAD; + + server->workers.currently_running -= 1; + } + } + + pthread_mutex_unlock(&(server->workers.mutex)); +} diff --git a/src/server/server_new_connection.c b/src/server/server_new_connection.c new file mode 100644 index 0000000..0734249 --- /dev/null +++ b/src/server/server_new_connection.c @@ -0,0 +1,202 @@ +#include <sys/socket.h> + +#include <errno.h> +#include <string.h> +#include <stdio.h> +#include <stdlib.h> +#include <stdint.h> +#include <unistd.h> + +#include "../parameters/parameters.h" + +#include "server.h" + +static int get_new_socket (struct JH_server server [const restrict static 1]) +{ + const int old_errno = errno; + + server->thread_params.socket = + accept + ( + server->socket.file_descriptor, + (struct sockaddr *) NULL, + (socklen_t *) NULL + ); + + if (server->thread_params.socket == -1) + { + JH_ERROR + ( + stderr, + "Unable to accept on the server's socket: %s.", + strerror(errno) + ); + + errno = old_errno; + + return -1; + } + + errno = old_errno; + + return 0; +} + +static int get_new_thread (struct JH_server server [const restrict static 1]) +{ + struct JH_server_thread_data * new_threads; + JH_index i; + + pthread_mutex_lock(&(server->workers.mutex)); + + for (i = 0; i < server->workers.threads_capacity; ++i) + { + if (server->workers.threads[i].state == JH_SERVER_NO_THREAD) + { + server->thread_params.thread_id = i; + + pthread_mutex_unlock(&(server->workers.mutex)); + + return 0; + } + } + + if + ( + (server->workers.threads_capacity == JH_INDEX_MAX) + || + ( + (size_t) (server->workers.threads_capacity + 1) + > (SIZE_MAX / sizeof(struct JH_server_thread_data)) + ) + ) + { + JH_S_ERROR + ( + stderr, + "Maximum number of concurrent threads attained, unable to add more." + ); + + pthread_mutex_unlock(&(server->workers.mutex)); + + return -1; + } + + server->thread_params.thread_id = server->workers.threads_capacity; + server->workers.threads_capacity += 1; + + new_threads = + (struct JH_server_thread_data *) realloc + ( + server->workers.threads, + ( + sizeof(struct JH_server_thread_data) + * ((size_t) server->workers.threads_capacity) + ) + ); + + if (new_threads == ((struct JH_server_thread_data *) NULL)) + { + JH_S_ERROR + ( + stderr, + "Reallocation of the threads' data list failed." + ); + + pthread_mutex_unlock(&(server->workers.mutex)); + + return -1; + } + + server->workers.threads = new_threads; + + pthread_mutex_unlock(&(server->workers.mutex)); + + return 0; +} + +static int spawn_thread +( + struct JH_server server [const restrict static 1], + void * (*thread_main) (void *) +) +{ + const JH_index thread_id = server->thread_params.thread_id; + int error; + + server->workers.threads[thread_id].state = JH_SERVER_RUNNING_THREAD; + + error = + pthread_create + ( + &(server->workers.threads[thread_id].posix_id), + (const pthread_attr_t *) NULL, + thread_main, + (void *) &(server->thread_params) + ); + + if (error != 0) + { + JH_ERROR + ( + stderr, + "Unable to spawn thread: %s.", + strerror(error) + ); + + server->workers.threads[thread_id].state = JH_SERVER_NO_THREAD; + + return -1; + } + + pthread_barrier_wait(&(server->workers.barrier)); + + server->workers.currently_running += 1; + + return 0; +} + +int JH_server_handle_new_connection +( + struct JH_server server [const restrict static 1] +) +{ + if (get_new_socket(server) < 0) + { + return -1; + } + + if (get_new_thread(server) < 0) + { + close(server->thread_params.socket); + + return -2; + } + + if (spawn_thread(server, JH_server_worker_main) < 0) + { + close(server->thread_params.socket); + + return -3; + } + + return 0; +} + +int JH_server_worker_data_merger_thread_init +( + struct JH_server server [const restrict static 1] +) +{ + if (get_new_thread(server) < 0) + { + return -1; + } + + if (spawn_thread(server, JH_server_worker_data_merger_main) < 0) + { + return -2; + } + + return 0; +} diff --git a/src/server/server_signal.c b/src/server/server_signal.c new file mode 100644 index 0000000..9361382 --- /dev/null +++ b/src/server/server_signal.c @@ -0,0 +1,41 @@ +#include <signal.h> +#include <string.h> +#include <stdio.h> + +#include "server.h" + +static volatile char JH_SERVER_IS_RUNNING = (char) 1; + +static void request_termination (int const signo) +{ + if ((signo == SIGINT) || (signo == SIGTERM)) + { + JH_server_request_termination(); + } +} + +void JH_server_request_termination (void) +{ + JH_SERVER_IS_RUNNING = (char) 0; +} + +int JH_server_is_running (void) +{ + return (int) JH_SERVER_IS_RUNNING; +} + +int JH_server_set_signal_handlers (void) +{ + /* + struct sigaction act; + + act.sa_handler = request_termination; + act.sa_mask = + act.sa_flags = + act.sa_restorer = + */ + + /* TODO */ + + return -1; +} diff --git a/src/server/server_types.h b/src/server/server_types.h new file mode 100644 index 0000000..3159ca5 --- /dev/null +++ b/src/server/server_types.h @@ -0,0 +1,87 @@ +#ifndef _JH_SERVER_SERVER_TYPES_H_ +#define _JH_SERVER_SERVER_TYPES_H_ + +#include <sys/time.h> +#include <stdio.h> + +#ifndef JH_RUNNING_FRAMA_C + #include <pthread.h> +#endif + +#include "../core/index_types.h" + +#include "../parameters/parameters_types.h" + +#include "../error/error.h" + +#include "../filter/filter_types.h" + +#define JH_SERVER_MAX_RETRIES 10 +#define JH_SERVER_BUFFER_SIZE 0 + +#define JH_SERVER_SOCKET_ACCEPT_TIMEOUT_SEC 5 +#define JH_SERVER_SOCKET_LISTEN_BACKLOG 5 + +#define JH_SERVER_WORKER_MAX_WAITING_TIME 5 + +enum JH_server_thread_state +{ + JH_SERVER_JOINING_THREAD, + JH_SERVER_TERMINATED_THREAD, + JH_SERVER_RUNNING_THREAD, + JH_SERVER_NO_THREAD +}; + +struct JH_server_thread_data +{ +#ifndef JH_RUNNING_FRAMA_C + pthread_t posix_id; +#endif + enum JH_server_thread_state state; + FILE storage_file; +}; + +struct JH_server_thread_collection +{ + struct JH_server_thread_data * threads; + char * storage_filename; /* protected by mutex */ + JH_index threads_capacity; /* protected by merger_mutex */ +#ifndef JH_RUNNING_FRAMA_C + pthread_mutex_t mutex; + pthread_barrier_t barrier; +#endif + pthread_mutex_t merger_mutex; + pthread_cond_t merger_condition; + JH_index currently_running; +}; + +struct JH_server_socket +{ + int file_descriptor; + fd_set as_a_set; + struct timeval timeout; +}; + +struct JH_server_thread_parameters +{ + struct JH_server_thread_collection * thread_collection; + const struct JH_parameters * server_params; + JH_index thread_id; + int socket; +}; + +struct JH_server_worker +{ + struct JH_server_thread_parameters params; + int downstream_socket; + FILE * storage_file; +}; + +struct JH_server +{ + struct JH_server_thread_collection workers; + struct JH_server_socket socket; + struct JH_server_thread_parameters thread_params; +}; + +#endif diff --git a/src/server/server_wait_for_event.c b/src/server/server_wait_for_event.c new file mode 100644 index 0000000..c949438 --- /dev/null +++ b/src/server/server_wait_for_event.c @@ -0,0 +1,62 @@ +#include <sys/select.h> + +#include <errno.h> +#include <stdio.h> +#include <string.h> + +#include "../error/error.h" + +#include "server.h" + +int JH_server_wait_for_event +( + struct JH_server server [const static 1] +) +{ + int ready_fds; + const int old_errno = errno; + fd_set ready_to_read; + + ready_to_read = server->socket.as_a_set; + + /* call to select may alter timeout */ + memset((void *) &(server->socket.timeout), 0, sizeof(struct timeval)); + + server->socket.timeout.tv_sec = JH_SERVER_SOCKET_ACCEPT_TIMEOUT_SEC; + + errno = 0; + + ready_fds = select + ( + (server->socket.file_descriptor + 1), + &ready_to_read, + (fd_set *) NULL, + (fd_set *) NULL, + &(server->socket.timeout) + ); + + JH_DEBUG(stderr, 1, "SELECT returned: %i, errno is %i.", ready_fds, errno); + + if (errno == EINTR) + { + ready_fds = 0; + } + + if (ready_fds == -1) + { + JH_FATAL + ( + stderr, + "Unable to wait on server socket: %s.", + strerror(errno) + ); + + errno = old_errno; + + return -1; + } + + errno = old_errno; + + return ready_fds; +} diff --git a/src/server/server_worker.c b/src/server/server_worker.c new file mode 100644 index 0000000..faf8c32 --- /dev/null +++ b/src/server/server_worker.c @@ -0,0 +1,274 @@ +#include <sys/socket.h> +#include <sys/un.h> + +#include <stdlib.h> +#include <unistd.h> +#include <string.h> +#include <stdio.h> +#include <errno.h> + +#include "../filter/filter.h" + +#include "../parameters/parameters.h" + +#include "server.h" + +static int connect_downstream +( + struct JH_server_worker worker [const restrict static 1] +) +{ + struct sockaddr_un addr; + + const int old_errno = errno; + + errno = 0; + + if ((worker->downstream_socket = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) + { + JH_FATAL + ( + stderr, + "Unable to create socket: %s.", + strerror(errno) + ); + + errno = old_errno; + + return -1; + } + + errno = old_errno; + + memset((void *) &addr, (int) 0, sizeof(addr)); + + addr.sun_family = AF_UNIX; + + strncpy + ( + (char *) addr.sun_path, + JH_parameters_get_dest_socket_name(worker->params.server_params), + (sizeof(addr.sun_path) - ((size_t) 1)) + ); + + errno = 0; + + if + ( + connect + ( + worker->downstream_socket, + (struct sockaddr *) &addr, + sizeof(addr) + ) + == -1 + ) + { + JH_FATAL + ( + stderr, + "Unable to connect to address: %s.", + strerror(errno) + ); + + errno = old_errno; + + close(worker->downstream_socket); + + worker->downstream_socket = -1; + + return -1; + } + + errno = old_errno; + + return 0; +} + +static int initialize +( + struct JH_server_worker worker [const restrict static 1], + void * input +) +{ + memcpy + ( + (void *) &(worker->params), + (const void *) input, + sizeof(struct JH_server_thread_parameters) + ); + + pthread_barrier_wait(&(worker->params.thread_collection->barrier)); + + worker->storage_file = (FILE *) NULL; + worker->downstream_socket = -1; + + pthread_mutex_lock(&(worker->params.thread_collection->mutex)); + + worker->storage_file = + JH_server_worker_open_storage_file + ( + worker->params.thread_collection, + worker->params.server_params, + worker->params.thread_id, + "w" + ); + + pthread_mutex_unlock(&(worker->params.thread_collection->mutex)); + + if (worker->storage_file == (FILE *) NULL) + { + return -1; + } + + return connect_downstream(worker); +} + +static void finalize +( + struct JH_server_worker worker [const restrict static 1] +) +{ + if (worker->downstream_socket != -1) + { + close(worker->downstream_socket); + + worker->downstream_socket = -1; + } + + if (worker->params.socket != -1) + { + close(worker->params.socket); + + worker->params.socket = -1; + } + + if (worker->storage_file != (FILE *) NULL) + { + fclose(worker->storage_file); + + worker->storage_file = (FILE *) NULL; + } + + pthread_mutex_lock(&(worker->params.thread_collection->mutex)); + + worker->params.thread_collection->threads[worker->params.thread_id].state = + JH_SERVER_TERMINATED_THREAD; + + pthread_mutex_unlock(&(worker->params.thread_collection->mutex)); + + pthread_mutex_lock(&(worker->params.thread_collection->merger_mutex)); + pthread_cond_signal(&(worker->params.thread_collection->merger_condition)); + pthread_mutex_unlock(&(worker->params.thread_collection->merger_mutex)); +} + + +/* Requires ownership of worker->params.thread_collection->mutex */ +FILE * JH_server_worker_open_storage_file +( + struct JH_server_thread_collection collection [const restrict static 1], + const struct JH_parameters params [const restrict static 1], + const JH_index thread_id, + const char mode [restrict static 1] +) +{ + FILE * in; + const int old_errno = errno; + + errno = 0; + + /**** (Re)generate the temporary storage file name *************************/ + sprintf + ( + collection->storage_filename, + ( + "%s" + JH_INDEX_TAG + ), + JH_parameters_get_temp_storage_prefix(params), + thread_id + ); + + /**** Try to open the file *************************************************/ + in = fopen(collection->storage_filename, mode); + + if (in == (FILE *) NULL) + { + JH_ERROR + ( + stderr, + "Could not open the temporary storage file \"%s\"" + " (errno: %d): %s.", + collection->storage_filename, + errno, + strerror(errno) + ); + + errno = old_errno; + + return (FILE *) NULL; + } + + errno = old_errno; + + return in; +} + +void * JH_server_worker_main (void * input) +{ + int status; + int timeout_count; + struct JH_filter filter; + struct JH_server_worker worker; + + initialize(&worker, input); + + if (JH_filter_initialize(&filter) < 0) + { + finalize(&worker); + + return NULL; + } + + timeout_count = 0; + + while (JH_server_is_running()) + { + status = + JH_filter_step + ( + &filter, + worker.params.socket, + worker.downstream_socket, + worker.storage_file + ); + + if (status == 0) + { + timeout_count = 0; + } + else if (status == 1) + { + timeout_count += 1; + + if (timeout_count == 2) + { + break; + } + else + { + sleep(JH_SERVER_WORKER_MAX_WAITING_TIME); + } + } + else + { + break; + } + } + + JH_filter_finalize(&filter); + + finalize(&worker); + + return NULL; +} diff --git a/src/server/server_worker_data_merger.c b/src/server/server_worker_data_merger.c new file mode 100644 index 0000000..79638db --- /dev/null +++ b/src/server/server_worker_data_merger.c @@ -0,0 +1,190 @@ +#include <stdlib.h> +#include <unistd.h> +#include <string.h> +#include <stdio.h> +#include <errno.h> + +#include "../core/index.h" + +#include "../parameters/parameters.h" +#include "../error/error.h" + +#include "../filter/filter.h" + +#include "server.h" + +static int initialize +( + struct JH_server_worker worker [const restrict static 1], + void * input +) +{ + memcpy + ( + (void *) &(worker->params), + (const void *) input, + sizeof(struct JH_server_thread_parameters) + ); + + pthread_barrier_wait(&(worker->params.thread_collection->barrier)); + + return 0; +} + +static void finalize +( + struct JH_server_worker worker [const restrict static 1] +) +{ + close(worker->params.socket); + + pthread_mutex_lock(&(worker->params.thread_collection->mutex)); + + worker->params.thread_collection->threads[worker->params.thread_id].state = + JH_SERVER_JOINING_THREAD; + + pthread_mutex_unlock(&(worker->params.thread_collection->mutex)); +} + +static FILE * open_output_file +( + const struct JH_parameters params [const restrict static 1] +) +{ + FILE * out; + const int old_errno = errno; + + errno = 0; + + out = fopen(JH_parameters_get_main_storage_filename(params), "a"); + + if (out == (FILE *) NULL) + { + JH_ERROR + ( + stderr, + "The worker data merger could not open the main storage file \"%s\"" + " (errno: %d): %s.", + JH_parameters_get_main_storage_filename(params), + errno, + strerror(errno) + ); + + errno = old_errno; + + return (FILE *) NULL; + } + + errno = old_errno; + + return out; +} + +static void merge_thread_data +( + struct JH_server_thread_collection collection [const restrict static 1], + const JH_index thread_id, + const struct JH_parameters params [const restrict static 1] +) +{ + char c; + FILE * in, * out; + + /** Open input & output files **********************************************/ + out = open_output_file(params); + + if (out == (FILE *) NULL) + { + return; + } + + in = + JH_server_worker_open_storage_file + ( + collection, + params, + thread_id, + "r" + ); + + if (in == (FILE *) NULL) + { + fclose(out); + + return; + } + + /** Append content of 'in' to 'out' ****************************************/ + while ((c = (char) fgetc(in)) != EOF) + { + if (fputc(c, out) == EOF) + { + break; + } + } + + if ((ferror(in) != 0) || (ferror(out) != 0)) + { + JH_S_ERROR + ( + stderr, + "The Worker Data Merger could not append to the storage file.\n" + "Please check for any corruptions that could have been added to the " + "end of that file." + ); + } + + /** Close the files & return ***********************************************/ + fclose(in); + fclose(out); + + return; +} + +void * JH_server_worker_data_merger_main (void * input) +{ + JH_index i; + struct JH_server_worker worker; + + initialize(&worker, input); + + pthread_mutex_lock(&(worker.params.thread_collection->merger_mutex)); + + while (JH_server_is_running()) + { + pthread_cond_wait + ( + &(worker.params.thread_collection->merger_condition), + &(worker.params.thread_collection->merger_mutex) + ); + + pthread_mutex_lock(&(worker.params.thread_collection->mutex)); + + for (i = 0; i < worker.params.thread_collection->threads_capacity; ++i) + { + + if + ( + worker.params.thread_collection->threads[i].state + == JH_SERVER_TERMINATED_THREAD + ) + { + merge_thread_data + ( + worker.params.thread_collection, + i, + worker.params.server_params + ); + + worker.params.thread_collection->threads[i].state = + JH_SERVER_JOINING_THREAD; + } + } + + pthread_mutex_unlock(&(worker.params.thread_collection->mutex)); + } + + finalize(&worker); + + return NULL; +} |


