| summaryrefslogtreecommitdiff |
diff options
| author | Nathanael Sensfelder <SpamShield0@MultiAgentSystems.org> | 2017-06-15 12:48:19 +0200 |
|---|---|---|
| committer | Nathanael Sensfelder <SpamShield0@MultiAgentSystems.org> | 2017-06-15 12:48:19 +0200 |
| commit | 52d894c670cf9b9ae5c5276acfa7938fb4fbacd3 (patch) | |
| tree | 96d665b66589166f0ccf6ae9315c19b799562646 | |
Initial commit.
32 files changed, 2077 insertions, 0 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..9aeaef1 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,29 @@ +cmake_minimum_required(VERSION 3.0 FATAL_ERROR) + +project("JabberHive - Lowercase") + +include(FindPkgConfig) + +add_subdirectory(src) +add_definitions(-D_POSIX_SOURCE) +add_definitions(-D_POSIX_C_SOURCE=200809L) + +set(CMAKE_C_FLAGS $ENV{CFLAGS}) +if(CMAKE_COMPILER_IS_GNUCC) + set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -g -Wall -Wpedantic -Wconversion") + message(STATUS "GNUCC detected. Adding '-O3' parameter.") + set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -O3") +endif() +message(STATUS "CFLAGS=${CMAKE_C_FLAGS}") + +# ${SRC_FILES} is recursively defined in the subdirectories. +# Each subdirectory only adds the source files that are present at its level. +add_executable(jabberhive-lowercase ${SRC_FILES}) +set_property(TARGET jabberhive-lowercase PROPERTY C_STANDARD 99) +set_property(TARGET jabberhive-lowercase PROPERTY C_STANDARD_REQUIRED ON) + +find_package(Threads) +target_link_libraries(jabberhive-lowercase ${CMAKE_THREAD_LIBS_INIT}) + +## OPTION HANDLING ############################################################# +# TODO @@ -0,0 +1,27 @@ +Copyright (c) 2017, NathanaĆ«l Sensfelder +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +* Neither the name of JabberHive nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..9d7049c --- /dev/null +++ b/README.md @@ -0,0 +1,24 @@ +## What is JabberHive? +JabberHive is a modular ChatBot system. All "modules" are in fact separate +programs linked together using the JabberHive Protocol. Please refer to the +protocol for more information. + +## Component Description +* Filter for a JabberHive network. +* Turns STRING component of "?RL", "?RLR", and "?RR" requests into their +lowercase equivalent. + +## JabberHive Protocol Compatibility +* **Protocol Version(s):** 1. +* **Inbound Connections:** Multiple. +* **Outbound Connections:** Multiple. +* **Pipelining:** No. +* **Behavior:** Filter. + +## Notes +* Does not correctly reply to Pipelining & Protocol Version requests. + +## Dependencies +- POSIX compliant OS. +- C compiler (with C99 support). +- CMake. diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt new file mode 100644 index 0000000..0b242bc --- /dev/null +++ b/src/CMakeLists.txt @@ -0,0 +1,12 @@ +add_subdirectory(error) +add_subdirectory(core) +add_subdirectory(filter) +add_subdirectory(server) +add_subdirectory(parameters) + +set( + SRC_FILES ${SRC_FILES} + ${CMAKE_CURRENT_SOURCE_DIR}/main.c +) + +set(SRC_FILES ${SRC_FILES} PARENT_SCOPE) diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt new file mode 100644 index 0000000..fa07534 --- /dev/null +++ b/src/core/CMakeLists.txt @@ -0,0 +1,6 @@ +set( + SRC_FILES ${SRC_FILES} +) + +set(SRC_FILES ${SRC_FILES} PARENT_SCOPE) + diff --git a/src/core/index.h b/src/core/index.h new file mode 100644 index 0000000..594a1d2 --- /dev/null +++ b/src/core/index.h @@ -0,0 +1,6 @@ +#ifndef _JH_CORE_INDEX_H_ +#define _JH_CORE_INDEX_H_ + +#include "index_types.h" + +#endif diff --git a/src/core/index_types.h b/src/core/index_types.h new file mode 100644 index 0000000..2180815 --- /dev/null +++ b/src/core/index_types.h @@ -0,0 +1,28 @@ +#ifndef _JH_CORE_INDEX_TYPES_H_ +#define _JH_CORE_INDEX_TYPES_H_ + +#include "../pervasive.h" + +/* + * JH_index is a replacement for size_t. As many indices are stored for every + * word learned, having control over which type of variable is used to represent + * those indices lets us scale the RAM usage. + */ + +#include <limits.h> +#include <stdint.h> + +/* Must be unsigned. */ +typedef unsigned int JH_index; + +/* Must be > 0. */ +#define JH_INDEX_MAX UINT_MAX +#define JH_INDEX_TAG "%u" + +#ifndef JH_RUNNING_FRAMA_C + #if (JH_INDEX_MAX > SIZE_MAX) + #error "JH_index should not be able to go higher than a size_t variable." + #endif +#endif + +#endif diff --git a/src/error/CMakeLists.txt b/src/error/CMakeLists.txt new file mode 100644 index 0000000..fa07534 --- /dev/null +++ b/src/error/CMakeLists.txt @@ -0,0 +1,6 @@ +set( + SRC_FILES ${SRC_FILES} +) + +set(SRC_FILES ${SRC_FILES} PARENT_SCOPE) + diff --git a/src/error/error.h b/src/error/error.h new file mode 100644 index 0000000..145c838 --- /dev/null +++ b/src/error/error.h @@ -0,0 +1,143 @@ +#ifndef _JH_ERROR_ERROR_H_ +#define _JH_ERROR_ERROR_H_ + +#include <stdio.h> + +#include "../pervasive.h" + +#ifndef JH_DEBUG_PROGRAM_FLOW + #define JH_DEBUG_PROGRAM_FLOW (0 || JH_DEBUG_ALL) +#endif + +#ifndef JH_DEBUG_CONFIG + #define JH_DEBUG_CONFIG (0 || JH_DEBUG_ALL) +#endif + +#ifndef JH_DEBUG_LEARNING + #define JH_DEBUG_LEARNING (0 || JH_DEBUG_ALL) +#endif + +#ifndef JH_DEBUG_NETWORK + #define JH_DEBUG_NETWORK 1 +#endif + +#ifndef JH_DEBUG_NETWORK + #define JH_DEBUG_NETWORK (0 || JH_DEBUG_ALL) +#endif + +#define JH_ENABLE_WARNINGS_OUTPUT 1 +#define JH_ENABLE_RUNTIME_ERRORS_OUTPUT 1 +#define JH_ENABLE_PROGRAMMING_ERRORS_OUTPUT 1 +#define JH_ENABLE_FATAL_ERROR_OUTPUT 1 + +#ifdef JH_ENABLE_ERROR_LOCATION + #define JH_LOCATION " [" __FILE__ "][" JH_TO_STRING(__LINE__) "]" +#else + #define JH_LOCATION "" +#endif + +#define JH_PRINT_STDERR(io, symbol, str, ...)\ + fprintf(io, "[" symbol "]" JH_LOCATION " " str "\n", __VA_ARGS__); + +/* + * Given that we use preprocessor contants as flags, we can expect the compilers + * to remove the test condition for disabled flags. No need to be shy about + * allowing many debug options. + */ + +#define JH_DEBUG(io, flag, str, ...)\ + JH_ISOLATE\ + (\ + if (flag)\ + {\ + JH_PRINT_STDERR(io, "D", str, __VA_ARGS__);\ + }\ + ) + + +#define JH_WARNING(io, str, ...)\ + JH_ISOLATE\ + (\ + if (JH_ENABLE_WARNINGS_OUTPUT)\ + {\ + JH_PRINT_STDERR(io, "W", str, __VA_ARGS__);\ + }\ + ) + +#define JH_ERROR(io, str, ...)\ + JH_ISOLATE\ + (\ + if (JH_ENABLE_RUNTIME_ERRORS_OUTPUT)\ + {\ + JH_PRINT_STDERR(io, "E", str, __VA_ARGS__);\ + }\ + ) + +#define JH_PROG_ERROR(io, str, ...)\ + JH_ISOLATE\ + (\ + if (JH_ENABLE_PROGRAMMING_ERRORS_OUTPUT)\ + {\ + JH_PRINT_STDERR(io, "P", str, __VA_ARGS__);\ + }\ + ) + +#define JH_FATAL(io, str, ...)\ + JH_ISOLATE\ + (\ + if (JH_ENABLE_FATAL_ERROR_OUTPUT)\ + {\ + JH_PRINT_STDERR(io, "F", str, __VA_ARGS__);\ + }\ + ) + +/* For outputs without dynamic content (static). ******************************/ + +#define JH_PRINT_S_STDERR(io, symbol, str)\ + fprintf(io, "[" symbol "]" JH_LOCATION " " str "\n"); + +#define JH_S_DEBUG(io, flag, str)\ + JH_ISOLATE\ + (\ + if (flag)\ + {\ + JH_PRINT_S_STDERR(io, "D", str);\ + }\ + ) + +#define JH_S_WARNING(io, str)\ + JH_ISOLATE\ + (\ + if (JH_ENABLE_WARNINGS_OUTPUT)\ + {\ + JH_PRINT_S_STDERR(io, "W", str);\ + }\ + ) + +#define JH_S_ERROR(io, str)\ + JH_ISOLATE\ + (\ + if (JH_ENABLE_RUNTIME_ERRORS_OUTPUT)\ + {\ + JH_PRINT_S_STDERR(io, "E", str);\ + }\ + ) + +#define JH_S_PROG_ERROR(io, str)\ + JH_ISOLATE\ + (\ + if (JH_ENABLE_PROGRAMMING_ERRORS_OUTPUT)\ + {\ + JH_PRINT_S_STDERR(io, "P", str);\ + }\ + ) + +#define JH_S_FATAL(io, str)\ + JH_ISOLATE\ + (\ + if (JH_ENABLE_FATAL_ERROR_OUTPUT)\ + {\ + JH_PRINT_S_STDERR(io, "F", str);\ + }\ + ) +#endif diff --git a/src/filter/CMakeLists.txt b/src/filter/CMakeLists.txt new file mode 100644 index 0000000..c8add44 --- /dev/null +++ b/src/filter/CMakeLists.txt @@ -0,0 +1,6 @@ +set( + SRC_FILES ${SRC_FILES} + ${CMAKE_CURRENT_SOURCE_DIR}/filter.c +) + +set(SRC_FILES ${SRC_FILES} PARENT_SCOPE) diff --git a/src/filter/filter.c b/src/filter/filter.c new file mode 100644 index 0000000..f668756 --- /dev/null +++ b/src/filter/filter.c @@ -0,0 +1,368 @@ +#include <stdlib.h> +#include <errno.h> +#include <string.h> +#include <unistd.h> + +#include "../pervasive.h" +#include "../error/error.h" +#include "../parameters/parameters.h" + +#include "filter.h" + +char to_lowercase (const char c) +{ + if ((c >= 'A') && (c <= 'Z')) + { + return 'z' - ('Z' - c); + } + + return c; +} + +static int send_downstream +( + struct JH_filter filter [const restrict static 1], + const int upstream_socket, + const int downstream_socket +) +{ + char c; + ssize_t io_bytes; + const int old_errno = errno; + + for (;;) + { + errno = 0; + + io_bytes = + read + ( + upstream_socket, + (void *) &c, + sizeof(char) + ); + + if (io_bytes == -1) + { + JH_ERROR + ( + stderr, + "Upstream read error %d (\"%s\").", + errno, + strerror(errno) + ); + + errno = old_errno; + + return -1; + } + else if (io_bytes == 0) + { + return 1; + } + + switch (filter->buffer_index) + { + case 0: + if (c == '?') + { + filter->buffer_index = 1; + } + else + { + filter->buffer_index = -1; + } + break; + + case 1: + if (c == 'R') + { + filter->buffer_index = 2; + } + else + { + filter->buffer_index = -1; + } + break; + + case 2: + if ((c == 'L') || (c == 'R')) + { + filter->prev_c = c; + filter->buffer_index = 3; + } + else + { + filter->prev_c = '\0'; + filter->buffer_index = -1; + } + break; + + case 3: + if (c == ' ') + { + filter->buffer_index = JH_FILTER_BUFFER_SIZE; + } + else if ((c == 'R') && (filter->prev_c == 'L')) + { + filter->buffer_index = 4; + } + else + { + filter->buffer_index = -1; + } + break; + + case 4: + if (c == ' ') + { + filter->buffer_index = JH_FILTER_BUFFER_SIZE; + } + else + { + filter->buffer_index = -1; + } + break; + + case JH_FILTER_BUFFER_SIZE: + c = to_lowercase(c); + break; + + default: + break; + } + + errno = 0; + + io_bytes = write + ( + downstream_socket, + (void *) &c, + sizeof(char) + ); + + if (io_bytes == -1) + { + JH_ERROR + ( + stderr, + "Upstream write error %d (\"%s\").", + errno, + strerror(errno) + ); + + errno = old_errno; + + return -1; + } + + errno = old_errno; + + if (c == '\n') + { + filter->buffer_index = 0; + filter->state = JH_FILTER_IS_SENDING_UPSTREAM; + + return 0; + } + } +} + +static int send_upstream +( + struct JH_filter filter [const restrict static 1], + const int upstream_socket, + const int downstream_socket +) +{ + char c; + ssize_t io_bytes; + const int old_errno = errno; + + for (;;) + { + errno = 0; + + io_bytes = + read + ( + downstream_socket, + (void *) &c, + sizeof(char) + ); + + if (io_bytes == -1) + { + JH_ERROR + ( + stderr, + "Downstream read error %d (\"%s\").", + errno, + strerror(errno) + ); + + errno = old_errno; + + return -1; + } + else if (io_bytes == 0) + { + return 1; + } + + errno = 0; + + io_bytes = write + ( + upstream_socket, + (void *) &c, + sizeof(char) + ); + + if (io_bytes == -1) + { + JH_ERROR + ( + stderr, + "Upstream write error %d (\"%s\").", + errno, + strerror(errno) + ); + + errno = old_errno; + + return -1; + } + + errno = old_errno; + + switch (filter->buffer_index) + { + case -1: + if (c == '\n') + { + filter->buffer_index = 0; + } + break; + + case 0: + if (c == '!') + { + filter->buffer_index = 1; + } + else + { + filter->buffer_index = -1; + } + break; + + case 1: + if ((c == 'N') || (c == 'P')) + { + filter->buffer_index = 2; + } + else + { + filter->buffer_index = -1; + } + break; + + case 2: + if (c == ' ') + { + filter->buffer_index = 3; + } + else + { + filter->buffer_index = -1; + } + break; + + case 3: + if (c == '\n') + { + filter->buffer_index = 0; + filter->state = JH_FILTER_IS_SENDING_DOWNSTREAM; + + return 0; + } + else + { + filter->buffer_index = -1; + } + break; + + default: + JH_PROG_ERROR + ( + stderr, + "Invalid value for 'filter->buffer_index': %d.", + filter->buffer_index + ); + + filter->buffer_index = 0; + + return -1; + } + } + + return -1; +} + +/******************************************************************************/ +/** EXPORTED ******************************************************************/ +/******************************************************************************/ + +int JH_filter_step +( + struct JH_filter filter [const restrict static 1], + const int upstream_socket, + const int downstream_socket +) +{ + switch (filter->state) + { + case JH_FILTER_IS_SENDING_DOWNSTREAM: + JH_DEBUG(stderr, 1, "<SENDING_DOWN> (index: %d)", filter->buffer_index); + return + send_downstream + ( + filter, + upstream_socket, + downstream_socket + ); + + case JH_FILTER_IS_SENDING_UPSTREAM: + JH_DEBUG(stderr, 1, "<SENDING_UP> (index: %d)", filter->buffer_index); + return + send_upstream + ( + filter, + upstream_socket, + downstream_socket + ); + + default: + return -1; + } +} + +int JH_filter_initialize +( + struct JH_filter filter [const restrict static 1] +) +{ + filter->state = JH_FILTER_IS_SENDING_DOWNSTREAM; + filter->buffer_index = 0; + filter->prev_c = '\0'; + + return 0; +} + +void JH_filter_finalize +( + struct JH_filter filter [const restrict static 1] +) +{ + /* Nothing to do */ +} diff --git a/src/filter/filter.h b/src/filter/filter.h new file mode 100644 index 0000000..0b17fde --- /dev/null +++ b/src/filter/filter.h @@ -0,0 +1,26 @@ +#ifndef _JH_FILTER_H_ +#define _JH_FILTER_H_ + +#include "../parameters/parameters_types.h" +#include "../server/server_types.h" + +#include "filter_types.h" + +int JH_filter_initialize +( + struct JH_filter filter [const restrict static 1] +); + +int JH_filter_step +( + struct JH_filter filter [const restrict static 1], + const int upstream_socket, + const int downstream_socket +); + +void JH_filter_finalize +( + struct JH_filter filter [const restrict static 1] +); + +#endif diff --git a/src/filter/filter_types.h b/src/filter/filter_types.h new file mode 100644 index 0000000..7f36d6a --- /dev/null +++ b/src/filter/filter_types.h @@ -0,0 +1,21 @@ +#ifndef _JH_FILTER_TYPES_H_ +#define _JH_FILTER_TYPES_H_ + +#include <stdio.h> + +#define JH_FILTER_BUFFER_SIZE 5 + +enum JH_filter_state +{ + JH_FILTER_IS_SENDING_DOWNSTREAM, + JH_FILTER_IS_SENDING_UPSTREAM +}; + +struct JH_filter +{ + enum JH_filter_state state; + int buffer_index; + int prev_c; +}; + +#endif diff --git a/src/main.c b/src/main.c new file mode 100644 index 0000000..b8bef88 --- /dev/null +++ b/src/main.c @@ -0,0 +1,53 @@ +#include <sys/socket.h> +#include <sys/un.h> + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <errno.h> +#include <unistd.h> + +#include "error/error.h" +#include "parameters/parameters.h" +#include "server/server.h" + +#include "pervasive.h" + +static void print_help (const char runnable [const restrict static 1]) +{ + printf + ( + "JabberHive - Lowercase\n" + "Software Version %d\n" + "Protocol Version %d\n" + "\nUsages:\n" + " JH GATEWAY:\t%s SOCKET_NAME DESTINATION\n" + " SHOW HELP:\tAnything else.\n" + "\nParameters:\n" + " SOCKET_NAME:\tValid UNIX socket.\n" + " DESTINATION:\tValid UNIX socket.\n", + JH_PROGRAM_VERSION, + JH_PROTOCOL_VERSION, + runnable + ); +} + + +int main (int const argc, const char * argv [const static argc]) +{ + struct JH_parameters params; + + if (JH_parameters_initialize(¶ms, argc, argv) < 0) + { + print_help(argv[0]); + + return -1; + } + + if (JH_server_main(¶ms) < 0) + { + return -1; + } + + return 0; +} diff --git a/src/parameters/CMakeLists.txt b/src/parameters/CMakeLists.txt new file mode 100644 index 0000000..2aa7ece --- /dev/null +++ b/src/parameters/CMakeLists.txt @@ -0,0 +1,7 @@ +set( + SRC_FILES ${SRC_FILES} + ${CMAKE_CURRENT_SOURCE_DIR}/parameters.c + ${CMAKE_CURRENT_SOURCE_DIR}/parameters_getters.c +) +set(SRC_FILES ${SRC_FILES} PARENT_SCOPE) + diff --git a/src/parameters/parameters.c b/src/parameters/parameters.c new file mode 100644 index 0000000..1118be1 --- /dev/null +++ b/src/parameters/parameters.c @@ -0,0 +1,87 @@ +#include <stdio.h> +#include <stdlib.h> +#include <errno.h> +#include <string.h> +#include <limits.h> + +#include "../core/index.h" +#include "../error/error.h" + +#include "parameters.h" + +static void set_default_to_all_fields +( + struct JH_parameters param [const restrict static 1] +) +{ + param->socket_name = (const char *) NULL; + param->dest_socket_name = (const char *) NULL; +} + +static int is_valid +( + struct JH_parameters param [const restrict static 1] +) +{ + int valid; + + valid = 1; + + if (param->socket_name == (const char *) NULL) + { + JH_S_FATAL(stderr, "Missing parameter: This entity's socket name."); + + valid = 0; + } + + if (param->dest_socket_name == (const char *) NULL) + { + JH_S_FATAL(stderr, "Missing parameter: The destination's socket name."); + + valid = 0; + } + + return valid; +} + +static void set_parameters +( + struct JH_parameters param [const restrict static 1], + int const argc, + const char * argv [const static argc] +) +{ + if (argc < 2) + { + return; + } + + param->socket_name = argv[1]; + + if (argc < 3) + { + return; + } + + param->dest_socket_name = argv[2]; + +} + +int JH_parameters_initialize +( + struct JH_parameters param [const restrict static 1], + int const argc, + const char * argv [const static argc] +) +{ + set_default_to_all_fields(param); + + set_parameters(param, argc, argv); + + if (!is_valid(param)) + { + return -1; + } + + return 0; +} diff --git a/src/parameters/parameters.h b/src/parameters/parameters.h new file mode 100644 index 0000000..89f6e55 --- /dev/null +++ b/src/parameters/parameters.h @@ -0,0 +1,25 @@ +#ifndef _JH_CLI_PARAMETERS_H_ +#define _JH_CLI_PARAMETERS_H_ + +#include <stdlib.h> + +#include "parameters_types.h" + +int JH_parameters_initialize +( + struct JH_parameters param [const restrict static 1], + int const argc, + const char * argv [const static argc] +); + +const char * JH_parameters_get_socket_name +( + const struct JH_parameters param [const restrict static 1] +); + +const char * JH_parameters_get_dest_socket_name +( + const struct JH_parameters param [const restrict static 1] +); + +#endif diff --git a/src/parameters/parameters_getters.c b/src/parameters/parameters_getters.c new file mode 100644 index 0000000..fddc3a7 --- /dev/null +++ b/src/parameters/parameters_getters.c @@ -0,0 +1,19 @@ +#include <stdlib.h> + +#include "parameters.h" + +const char * JH_parameters_get_socket_name +( + const struct JH_parameters param [const restrict static 1] +) +{ + return param->socket_name; +} + +const char * JH_parameters_get_dest_socket_name +( + const struct JH_parameters param [const restrict static 1] +) +{ + return param->dest_socket_name; +} diff --git a/src/parameters/parameters_types.h b/src/parameters/parameters_types.h new file mode 100644 index 0000000..667def7 --- /dev/null +++ b/src/parameters/parameters_types.h @@ -0,0 +1,18 @@ +#ifndef _JH_CLI_PARAMETERS_TYPES_H_ +#define _JH_CLI_PARAMETERS_TYPES_H_ + +#include <stdlib.h> + +#define JH_PARAMETERS_COUNT 2 + +#define JH_PARAMETERS_DEFAULT_MAIN_STORAGE_FILENAME "storage.txt" +#define JH_PARAMETERS_DEFAULT_TEMP_STORAGE_PREFIX "/tmp/jabberhive-storage/storage_thread_" + +struct JH_parameters +{ + /* JH **********************************************************************/ + const char * restrict socket_name; + const char * restrict dest_socket_name; +}; + +#endif diff --git a/src/pervasive.h b/src/pervasive.h new file mode 100644 index 0000000..27d832d --- /dev/null +++ b/src/pervasive.h @@ -0,0 +1,28 @@ +#ifndef _JH_PERVASIVE_H_ +#define _JH_PERVASIVE_H_ + +#include <string.h> + +#define JH_PROGRAM_VERSION 1 +#define JH_PROTOCOL_VERSION 1 + +#ifdef __FRAMA_C__ + #define JH_RUNNING_FRAMA_C 1 +#endif + +#define JH_DEBUG_ALL 1 + +#ifndef JH_DEBUG_ALL + #define JH_DEBUG_ALL 0 +#endif + +#define JH__TO_STRING(x) #x +#define JH_TO_STRING(x) JH__TO_STRING(x) +#define JH_ISOLATE(a) do {a} while (0) + +/* strncmp stops at '\0' and strlen does not count '\0'. */ +#define JH_IS_PREFIX(a, b) (strncmp(a, b, strlen(a)) == 0) + +#define JH_STRING_EQUALS(a, b) (strcmp(a, b) == 0) + +#endif diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt new file mode 100644 index 0000000..9ade789 --- /dev/null +++ b/src/server/CMakeLists.txt @@ -0,0 +1,15 @@ +set( + SRC_FILES ${SRC_FILES} + ${CMAKE_CURRENT_SOURCE_DIR}/server.c + ${CMAKE_CURRENT_SOURCE_DIR}/server_create_socket.c + ${CMAKE_CURRENT_SOURCE_DIR}/server_finalize.c + ${CMAKE_CURRENT_SOURCE_DIR}/server_initialize.c + ${CMAKE_CURRENT_SOURCE_DIR}/server_joining_threads.c + ${CMAKE_CURRENT_SOURCE_DIR}/server_new_connection.c + ${CMAKE_CURRENT_SOURCE_DIR}/server_signal.c + ${CMAKE_CURRENT_SOURCE_DIR}/server_wait_for_event.c + ${CMAKE_CURRENT_SOURCE_DIR}/server_worker.c +) + +set(SRC_FILES ${SRC_FILES} PARENT_SCOPE) + diff --git a/src/server/server.c b/src/server/server.c new file mode 100644 index 0000000..640584f --- /dev/null +++ b/src/server/server.c @@ -0,0 +1,101 @@ +#include <signal.h> +#include <string.h> +#include <stdio.h> + +#include "../parameters/parameters.h" + +#include "server.h" + +int JH_server_main +( + const struct JH_parameters params [const restrict static 1] +) +{ + struct JH_server server; + JH_index retries; + + retries = 0; + /* TODO + if (JH_server_set_signal_handlers < 0) + { + return -1; + } + */ + + if (JH_server_initialize(&server, params) < 0) + { + return -1; + } + + while (JH_server_is_running()) + { + switch (JH_server_wait_for_event(&server)) + { + case 0: /* Timed out or signal'd. */ + JH_S_DEBUG(stderr, 1, "Timed out..."); + JH_server_handle_joining_threads(&server); + + retries = 0; + + break; + + case 1: /* New client attempted connection. */ + JH_S_DEBUG(stderr, 1, "New connection."); + JH_server_handle_joining_threads(&server); + (void) JH_server_handle_new_connection(&server); + + retries = 0; + + break; + + case -1: /* Something bad happened. */ + retries += 1; + + if (retries == JH_SERVER_MAX_RETRIES) + { + JH_server_finalize(&server); + + return -1; + } + + break; + + default: + JH_S_PROG_ERROR + ( + stderr, + "Unexpected wait_for_event return value." + ); + + break; + } + } + + /* Waiting for the threads to join... */ + while (server.workers.currently_running > 0) + { + switch (JH_server_wait_for_event(&server)) + { + case 0: /* Timed out. */ + case 1: /* New client attempted connection. */ + JH_server_handle_joining_threads(&server); + break; + + case -1: /* Something bad happened. */ + retries += 1; + + if (retries == JH_SERVER_MAX_RETRIES) + { + JH_server_finalize(&server); + + return -1; + } + break; + } + } + + JH_server_finalize(&server); + + return 0; +} + diff --git a/src/server/server.h b/src/server/server.h new file mode 100644 index 0000000..16ce785 --- /dev/null +++ b/src/server/server.h @@ -0,0 +1,84 @@ +#ifndef _JH_SERVER_SERVER_H_ +#define _JH_SERVER_SERVER_H_ + +#include "../parameters/parameters_types.h" + +#include "server_types.h" + +int JH_server_initialize +( + struct JH_server server [const restrict static 1], + const struct JH_parameters params [const restrict static 1] +); + +int JH_server_socket_open +( + struct JH_server_socket server_socket [const restrict static 1], + const char socket_name [const restrict static 1] +); + +void JH_server_request_termination (void); +int JH_server_is_running (void); +int JH_server_set_signal_handlers (void); + +int JH_server_main +( + const struct JH_parameters params [const restrict static 1] +); + +void JH_server_finalize (struct JH_server [const restrict static 1]); + +int JH_server_wait_for_event +( + struct JH_server server [const restrict static 1] +); + +void JH_server_handle_joining_threads +( + struct JH_server server [const restrict static 1] +); + +int JH_server_handle_new_connection +( + struct JH_server server [const restrict static 1] +); + +void * JH_server_worker_main (void * input); +void * JH_server_worker_data_merger_main (void * input); + +int JH_server_worker_receive +( + struct JH_server_worker worker [const restrict static 1] +); + +int JH_server_worker_handle_request +( + struct JH_server_worker worker [const restrict static 1] +); + +int JH_server_worker_send_confirm_pipelining_support +( + struct JH_server_worker worker [const restrict static 1] +); + +int JH_server_worker_send_confirm_protocol_version +( + struct JH_server_worker worker [const restrict static 1] +); + +int JH_server_worker_send_positive +( + struct JH_server_worker worker [const restrict static 1] +); + +int JH_server_worker_send_negative +( + struct JH_server_worker worker [const restrict static 1] +); + +int JH_server_worker_send_generated_reply +( + struct JH_server_worker worker [const restrict static 1] +); + +#endif diff --git a/src/server/server_create_socket.c b/src/server/server_create_socket.c new file mode 100644 index 0000000..5e0c00b --- /dev/null +++ b/src/server/server_create_socket.c @@ -0,0 +1,195 @@ +#include <errno.h> +#include <fcntl.h> +#include <stdio.h> +#include <string.h> +#include <unistd.h> + +#include <sys/socket.h> +#include <sys/un.h> + +#include "../error/error.h" + +#include "server.h" + +static int create_socket (int result [const restrict static 1]) +{ + const int old_errno = errno; + + errno = 0; + *result = socket(AF_UNIX, SOCK_STREAM, 0); + + if (*result == -1) + { + JH_FATAL + ( + stderr, + "Unable to create server socket: %s.", + strerror(errno) + ); + + errno = old_errno; + + return -1; + } + + errno = old_errno; + + return 0; +} + +static int bind_socket +( + const int socket, + const char socket_name [const restrict static 1] +) +{ + struct sockaddr_un addr; + const int old_errno = errno; + + errno = 0; + memset(&addr, 0, sizeof(struct sockaddr_un)); + + addr.sun_family = AF_UNIX; + + strncpy + ( + (void *) addr.sun_path, + (const void *) socket_name, + (sizeof(addr.sun_path) - 1) + ); + + errno = old_errno; + + if + ( + bind + ( + socket, + (const struct sockaddr *) &addr, + (socklen_t) sizeof(struct sockaddr_un) + ) != 0 + ) + { + JH_FATAL + ( + stderr, + "Unable to bind server socket to %s: %s.", + socket_name, + strerror(errno) + ); + + errno = old_errno; + + return -1; + } + + errno = old_errno; + + return 0; +} + +static int set_socket_to_unblocking (const int socket) +{ + int current_flags; + const int old_errno = errno; + + current_flags = fcntl(socket, F_GETFD); + + if (current_flags == -1) + { + JH_FATAL + ( + stderr, + "Unable to get server socket properties: %s.", + strerror(errno) + ); + + errno = old_errno; + + return -1; + } + + /* current_flags = current_flags & (~O_NONBLOCK); */ + + current_flags = fcntl(socket, F_SETFD, (current_flags | O_NONBLOCK)); + + if (current_flags == -1) + { + JH_FATAL + ( + stderr, + "Unable to set server socket properties: %s.", + strerror(errno) + ); + + errno = old_errno; + + return -2; + } + + errno = old_errno; + + return 0; +} + +static int set_socket_as_listener (const int socket) +{ + const int old_errno = errno; + + if (listen(socket, JH_SERVER_SOCKET_LISTEN_BACKLOG) != 0) + { + JH_FATAL + ( + stderr, + "Unable to set server socket properties: %s.", + strerror(errno) + ); + + errno = old_errno; + + return -1; + } + + errno = old_errno; + + return 0; +} + +int JH_server_socket_open +( + struct JH_server_socket server_socket [const restrict static 1], + const char socket_name [const restrict static 1] +) +{ + printf("\"%s\"\n", socket_name); + if (create_socket(&(server_socket->file_descriptor)) < 0) + { + return -1; + } + + if (bind_socket(server_socket->file_descriptor, socket_name) < 0) + { + close(server_socket->file_descriptor); + + return -1; + } + + if (set_socket_to_unblocking(server_socket->file_descriptor) < 0) + { + close(server_socket->file_descriptor); + + return -1; + } + + if (set_socket_as_listener(server_socket->file_descriptor) < 0) + { + close(server_socket->file_descriptor); + + return -1; + } + + FD_ZERO(&(server_socket->as_a_set)); + FD_SET(server_socket->file_descriptor, &(server_socket->as_a_set)); + + return 0; +} diff --git a/src/server/server_finalize.c b/src/server/server_finalize.c new file mode 100644 index 0000000..25ea672 --- /dev/null +++ b/src/server/server_finalize.c @@ -0,0 +1,42 @@ +#include <stdlib.h> +#include <unistd.h> + +#include "../parameters/parameters.h" + +#include "server.h" + +static void finalize_thread_collection +( + struct JH_server_thread_collection workers [const restrict static 1] +) +{ + free((void *) workers->threads); + + workers->threads_capacity = 0; + + pthread_mutex_destroy(&(workers->mutex)); + pthread_barrier_destroy(&(workers->barrier)); + + workers->currently_running = 0; +} + +static void finalize_socket +( + struct JH_server_socket socket [const restrict static 1] +) +{ + FD_ZERO(&(socket->as_a_set)); + + close(socket->file_descriptor); + + socket->file_descriptor = -1; +} + +void JH_server_finalize +( + struct JH_server server [const restrict static 1] +) +{ + finalize_thread_collection(&(server->workers)); + finalize_socket(&(server->socket)); +} diff --git a/src/server/server_initialize.c b/src/server/server_initialize.c new file mode 100644 index 0000000..4b94fc6 --- /dev/null +++ b/src/server/server_initialize.c @@ -0,0 +1,106 @@ +#include <signal.h> +#include <string.h> +#include <stdio.h> +#include <stdlib.h> + +#include "../core/index.h" +#include "../parameters/parameters.h" + +#include "server.h" + +static int initialize_worker_collection +( + struct JH_server_thread_collection c [const restrict static 1], + const struct JH_parameters params [const restrict static 1] +) +{ + int error; + + c->threads = (struct JH_server_thread_data *) NULL; + c->threads_capacity = 0; + c->currently_running = 0; + + error = + pthread_mutex_init + ( + &(c->mutex), + (const pthread_mutexattr_t *) NULL + ); + + if (error != 0) + { + JH_FATAL + ( + stderr, + "Unable to initialize worker collection's mutex (error: %d): %s.", + error, + strerror(error) + ); + + return -1; + } + + error = + pthread_barrier_init + ( + &(c->barrier), + (const pthread_barrierattr_t *) NULL, + 2 + ); + + if (error != 0) + { + pthread_mutex_destroy(&(c->mutex)); + + JH_FATAL + ( + stderr, + "Unable to initialize worker collection's barrier (error: %d): %s.", + error, + strerror(error) + ); + + return -1; + } + + return 0; +} + +void initialize_thread_parameters +( + struct JH_server server [const restrict static 1], + const struct JH_parameters params [const restrict static 1] +) +{ + server->thread_params.thread_collection = &(server->workers); + server->thread_params.server_params = params; + server->thread_params.socket = -1; +} + +int JH_server_initialize +( + struct JH_server server [const restrict static 1], + const struct JH_parameters params [const restrict static 1] +) +{ + if (initialize_worker_collection(&(server->workers), params) < 0) + { + return -1; + } + + if + ( + JH_server_socket_open + ( + &(server->socket), + JH_parameters_get_socket_name(params) + ) < 0 + ) + { + return -2; + } + + initialize_thread_parameters(server, params); + + return 0; +} diff --git a/src/server/server_joining_threads.c b/src/server/server_joining_threads.c new file mode 100644 index 0000000..e1d92ca --- /dev/null +++ b/src/server/server_joining_threads.c @@ -0,0 +1,38 @@ +#include <sys/socket.h> + +#include <errno.h> +#include <string.h> +#include <stdio.h> +#include <stdlib.h> +#include <stdint.h> +#include <unistd.h> + +#include "../parameters/parameters.h" + +#include "server.h" + +void JH_server_handle_joining_threads +( + struct JH_server server [const restrict static 1] +) +{ + JH_index i; + + pthread_mutex_lock(&(server->workers.mutex)); + + for (i = 0; i < server->workers.threads_capacity; ++i) + { + if (server->workers.threads[i].state == JH_SERVER_JOINING_THREAD) + { + JH_DEBUG(stderr, 1, "Joining thread %u", i); + + pthread_join(server->workers.threads[i].posix_id, (void **) NULL); + + server->workers.threads[i].state = JH_SERVER_NO_THREAD; + + server->workers.currently_running -= 1; + } + } + + pthread_mutex_unlock(&(server->workers.mutex)); +} diff --git a/src/server/server_new_connection.c b/src/server/server_new_connection.c new file mode 100644 index 0000000..af007ee --- /dev/null +++ b/src/server/server_new_connection.c @@ -0,0 +1,184 @@ +#include <sys/socket.h> + +#include <errno.h> +#include <string.h> +#include <stdio.h> +#include <stdlib.h> +#include <stdint.h> +#include <unistd.h> + +#include "../parameters/parameters.h" + +#include "server.h" + +static int get_new_socket (struct JH_server server [const restrict static 1]) +{ + const int old_errno = errno; + + server->thread_params.socket = + accept + ( + server->socket.file_descriptor, + (struct sockaddr *) NULL, + (socklen_t *) NULL + ); + + if (server->thread_params.socket == -1) + { + JH_ERROR + ( + stderr, + "Unable to accept on the server's socket: %s.", + strerror(errno) + ); + + errno = old_errno; + + return -1; + } + + errno = old_errno; + + return 0; +} + +static int get_new_thread (struct JH_server server [const restrict static 1]) +{ + struct JH_server_thread_data * new_threads; + JH_index i; + + pthread_mutex_lock(&(server->workers.mutex)); + + for (i = 0; i < server->workers.threads_capacity; ++i) + { + if (server->workers.threads[i].state == JH_SERVER_NO_THREAD) + { + server->thread_params.thread_id = i; + + pthread_mutex_unlock(&(server->workers.mutex)); + + return 0; + } + } + + if + ( + (server->workers.threads_capacity == JH_INDEX_MAX) + || + ( + (size_t) (server->workers.threads_capacity + 1) + > (SIZE_MAX / sizeof(struct JH_server_thread_data)) + ) + ) + { + JH_S_ERROR + ( + stderr, + "Maximum number of concurrent threads attained, unable to add more." + ); + + pthread_mutex_unlock(&(server->workers.mutex)); + + return -1; + } + + server->thread_params.thread_id = server->workers.threads_capacity; + server->workers.threads_capacity += 1; + + new_threads = + (struct JH_server_thread_data *) realloc + ( + server->workers.threads, + ( + sizeof(struct JH_server_thread_data) + * ((size_t) server->workers.threads_capacity) + ) + ); + + if (new_threads == ((struct JH_server_thread_data *) NULL)) + { + JH_S_ERROR + ( + stderr, + "Reallocation of the threads' data list failed." + ); + + pthread_mutex_unlock(&(server->workers.mutex)); + + return -1; + } + + server->workers.threads = new_threads; + + pthread_mutex_unlock(&(server->workers.mutex)); + + return 0; +} + +static int spawn_thread +( + struct JH_server server [const restrict static 1], + void * (*thread_main) (void *) +) +{ + const JH_index thread_id = server->thread_params.thread_id; + int error; + + server->workers.threads[thread_id].state = JH_SERVER_RUNNING_THREAD; + + error = + pthread_create + ( + &(server->workers.threads[thread_id].posix_id), + (const pthread_attr_t *) NULL, + thread_main, + (void *) &(server->thread_params) + ); + + if (error != 0) + { + JH_ERROR + ( + stderr, + "Unable to spawn thread: %s.", + strerror(error) + ); + + server->workers.threads[thread_id].state = JH_SERVER_NO_THREAD; + + return -1; + } + + pthread_barrier_wait(&(server->workers.barrier)); + + server->workers.currently_running += 1; + + return 0; +} + +int JH_server_handle_new_connection +( + struct JH_server server [const restrict static 1] +) +{ + if (get_new_socket(server) < 0) + { + return -1; + } + + if (get_new_thread(server) < 0) + { + close(server->thread_params.socket); + + return -2; + } + + if (spawn_thread(server, JH_server_worker_main) < 0) + { + close(server->thread_params.socket); + + return -3; + } + + return 0; +} diff --git a/src/server/server_signal.c b/src/server/server_signal.c new file mode 100644 index 0000000..9361382 --- /dev/null +++ b/src/server/server_signal.c @@ -0,0 +1,41 @@ +#include <signal.h> +#include <string.h> +#include <stdio.h> + +#include "server.h" + +static volatile char JH_SERVER_IS_RUNNING = (char) 1; + +static void request_termination (int const signo) +{ + if ((signo == SIGINT) || (signo == SIGTERM)) + { + JH_server_request_termination(); + } +} + +void JH_server_request_termination (void) +{ + JH_SERVER_IS_RUNNING = (char) 0; +} + +int JH_server_is_running (void) +{ + return (int) JH_SERVER_IS_RUNNING; +} + +int JH_server_set_signal_handlers (void) +{ + /* + struct sigaction act; + + act.sa_handler = request_termination; + act.sa_mask = + act.sa_flags = + act.sa_restorer = + */ + + /* TODO */ + + return -1; +} diff --git a/src/server/server_types.h b/src/server/server_types.h new file mode 100644 index 0000000..62e785b --- /dev/null +++ b/src/server/server_types.h @@ -0,0 +1,81 @@ +#ifndef _JH_SERVER_SERVER_TYPES_H_ +#define _JH_SERVER_SERVER_TYPES_H_ + +#include <sys/time.h> +#include <stdio.h> + +#ifndef JH_RUNNING_FRAMA_C + #include <pthread.h> +#endif + +#include "../core/index_types.h" + +#include "../parameters/parameters_types.h" + +#include "../error/error.h" + +#include "../filter/filter_types.h" + +#define JH_SERVER_MAX_RETRIES 10 +#define JH_SERVER_BUFFER_SIZE 0 + +#define JH_SERVER_SOCKET_ACCEPT_TIMEOUT_SEC 5 +#define JH_SERVER_SOCKET_LISTEN_BACKLOG 5 + +#define JH_SERVER_WORKER_MAX_WAITING_TIME 5 + +enum JH_server_thread_state +{ + JH_SERVER_JOINING_THREAD, + JH_SERVER_RUNNING_THREAD, + JH_SERVER_NO_THREAD +}; + +struct JH_server_thread_data +{ +#ifndef JH_RUNNING_FRAMA_C + pthread_t posix_id; +#endif + enum JH_server_thread_state state; +}; + +struct JH_server_thread_collection +{ + struct JH_server_thread_data * threads; + JH_index threads_capacity; /* protected by merger_mutex */ +#ifndef JH_RUNNING_FRAMA_C + pthread_mutex_t mutex; + pthread_barrier_t barrier; +#endif + JH_index currently_running; +}; + +struct JH_server_socket +{ + int file_descriptor; + fd_set as_a_set; + struct timeval timeout; +}; + +struct JH_server_thread_parameters +{ + struct JH_server_thread_collection * thread_collection; + const struct JH_parameters * server_params; + JH_index thread_id; + int socket; +}; + +struct JH_server_worker +{ + struct JH_server_thread_parameters params; + int downstream_socket; +}; + +struct JH_server +{ + struct JH_server_thread_collection workers; + struct JH_server_socket socket; + struct JH_server_thread_parameters thread_params; +}; + +#endif diff --git a/src/server/server_wait_for_event.c b/src/server/server_wait_for_event.c new file mode 100644 index 0000000..c949438 --- /dev/null +++ b/src/server/server_wait_for_event.c @@ -0,0 +1,62 @@ +#include <sys/select.h> + +#include <errno.h> +#include <stdio.h> +#include <string.h> + +#include "../error/error.h" + +#include "server.h" + +int JH_server_wait_for_event +( + struct JH_server server [const static 1] +) +{ + int ready_fds; + const int old_errno = errno; + fd_set ready_to_read; + + ready_to_read = server->socket.as_a_set; + + /* call to select may alter timeout */ + memset((void *) &(server->socket.timeout), 0, sizeof(struct timeval)); + + server->socket.timeout.tv_sec = JH_SERVER_SOCKET_ACCEPT_TIMEOUT_SEC; + + errno = 0; + + ready_fds = select + ( + (server->socket.file_descriptor + 1), + &ready_to_read, + (fd_set *) NULL, + (fd_set *) NULL, + &(server->socket.timeout) + ); + + JH_DEBUG(stderr, 1, "SELECT returned: %i, errno is %i.", ready_fds, errno); + + if (errno == EINTR) + { + ready_fds = 0; + } + + if (ready_fds == -1) + { + JH_FATAL + ( + stderr, + "Unable to wait on server socket: %s.", + strerror(errno) + ); + + errno = old_errno; + + return -1; + } + + errno = old_errno; + + return ready_fds; +} diff --git a/src/server/server_worker.c b/src/server/server_worker.c new file mode 100644 index 0000000..0542b1f --- /dev/null +++ b/src/server/server_worker.c @@ -0,0 +1,189 @@ +#include <sys/socket.h> +#include <sys/un.h> + +#include <stdlib.h> +#include <unistd.h> +#include <string.h> +#include <stdio.h> +#include <errno.h> + +#include "../filter/filter.h" + +#include "../parameters/parameters.h" + +#include "server.h" + +static int connect_downstream +( + struct JH_server_worker worker [const restrict static 1] +) +{ + struct sockaddr_un addr; + + const int old_errno = errno; + + errno = 0; + + if ((worker->downstream_socket = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) + { + JH_FATAL + ( + stderr, + "Unable to create socket: %s.", + strerror(errno) + ); + + errno = old_errno; + + return -1; + } + + errno = old_errno; + + memset((void *) &addr, (int) 0, sizeof(addr)); + + addr.sun_family = AF_UNIX; + + strncpy + ( + (char *) addr.sun_path, + JH_parameters_get_dest_socket_name(worker->params.server_params), + (sizeof(addr.sun_path) - ((size_t) 1)) + ); + + errno = 0; + + if + ( + connect + ( + worker->downstream_socket, + (struct sockaddr *) &addr, + sizeof(addr) + ) + == -1 + ) + { + JH_FATAL + ( + stderr, + "Unable to connect to address: %s.", + strerror(errno) + ); + + errno = old_errno; + + close(worker->downstream_socket); + + worker->downstream_socket = -1; + + return -1; + } + + errno = old_errno; + + return 0; +} + +static int initialize +( + struct JH_server_worker worker [const restrict static 1], + void * input +) +{ + memcpy + ( + (void *) &(worker->params), + (const void *) input, + sizeof(struct JH_server_thread_parameters) + ); + + pthread_barrier_wait(&(worker->params.thread_collection->barrier)); + + return connect_downstream(worker); +} + +static void finalize +( + struct JH_server_worker worker [const restrict static 1] +) +{ + if (worker->downstream_socket != -1) + { + close(worker->downstream_socket); + + worker->downstream_socket = -1; + } + + if (worker->params.socket != -1) + { + close(worker->params.socket); + + worker->params.socket = -1; + } + + pthread_mutex_lock(&(worker->params.thread_collection->mutex)); + + worker->params.thread_collection->threads[worker->params.thread_id].state = + JH_SERVER_JOINING_THREAD; + + pthread_mutex_unlock(&(worker->params.thread_collection->mutex)); +} + +void * JH_server_worker_main (void * input) +{ + int status; + int timeout_count; + struct JH_filter filter; + struct JH_server_worker worker; + + initialize(&worker, input); + + if (JH_filter_initialize(&filter) < 0) + { + finalize(&worker); + + return NULL; + } + + timeout_count = 0; + + while (JH_server_is_running()) + { + status = + JH_filter_step + ( + &filter, + worker.params.socket, + worker.downstream_socket + ); + + if (status == 0) + { + timeout_count = 0; + } + else if (status == 1) + { + timeout_count += 1; + + if (timeout_count == 2) + { + break; + } + else + { + sleep(JH_SERVER_WORKER_MAX_WAITING_TIME); + } + } + else + { + break; + } + } + + JH_filter_finalize(&filter); + + finalize(&worker); + + return NULL; +} |


