| summaryrefslogtreecommitdiff |
diff options
| author | Nathanael Sensfelder <SpamShield0@MultiAgentSystems.org> | 2017-06-09 15:18:04 +0200 |
|---|---|---|
| committer | Nathanael Sensfelder <SpamShield0@MultiAgentSystems.org> | 2017-06-09 15:18:04 +0200 |
| commit | 980d0b8c4c16b2583e2da052ed964a7170485ce2 (patch) | |
| tree | 1155e51fc4798e22c4cc2eafb05cbf725817a353 | |
Initial commit.
31 files changed, 2098 insertions, 0 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..7825260 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,29 @@ +cmake_minimum_required(VERSION 3.0 FATAL_ERROR) + +project("JabberHive - Limiter") + +include(FindPkgConfig) + +add_subdirectory(src) +add_definitions(-D_POSIX_SOURCE) +add_definitions(-D_POSIX_C_SOURCE=200809L) + +set(CMAKE_C_FLAGS $ENV{CFLAGS}) +if(CMAKE_COMPILER_IS_GNUCC) + set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -g -Wall -Wpedantic -Wconversion") + message(STATUS "GNUCC detected. Adding '-O3' parameter.") + set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -O3") +endif() +message(STATUS "CFLAGS=${CMAKE_C_FLAGS}") + +# ${SRC_FILES} is recursively defined in the subdirectories. +# Each subdirectory only adds the source files that are present at its level. +add_executable(jabberhive-limiter ${SRC_FILES}) +set_property(TARGET jabberhive-limiter PROPERTY C_STANDARD 99) +set_property(TARGET jabberhive-limiter PROPERTY C_STANDARD_REQUIRED ON) + +find_package(Threads) +target_link_libraries(jabberhive-limiter ${CMAKE_THREAD_LIBS_INIT}) + +## OPTION HANDLING ############################################################# +# TODO @@ -0,0 +1,27 @@ +Copyright (c) 2017, NathanaĆ«l Sensfelder +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +* Neither the name of JabberHive nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..b4d5d39 --- /dev/null +++ b/README.md @@ -0,0 +1,24 @@ +## What is JabberHive? +JabberHive is a modular ChatBot system. All "modules" are in fact separate +programs linked together using the JabberHive Protocol. Please refer to the +protocol for more information. + +## Component Description +* Filter for a JabberHive network. +* Randomly transforms ?RLR queries into ?RL ones. +* The transformation chance is a run-time parameter. + +## JabberHive Protocol Compatibility +* **Protocol Version(s):** 1. +* **Inbound Connections:** Multiple. +* **Outbound Connections:** Multiple. +* **Pipelining:** No. +* **Behavior:** Gateway. + +## Notes +* Does not correctly reply to Pipelining & Protocol Version requests. + +## Dependencies +- POSIX compliant OS. +- C compiler (with C99 support). +- CMake. diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt new file mode 100644 index 0000000..0b242bc --- /dev/null +++ b/src/CMakeLists.txt @@ -0,0 +1,12 @@ +add_subdirectory(error) +add_subdirectory(core) +add_subdirectory(filter) +add_subdirectory(server) +add_subdirectory(parameters) + +set( + SRC_FILES ${SRC_FILES} + ${CMAKE_CURRENT_SOURCE_DIR}/main.c +) + +set(SRC_FILES ${SRC_FILES} PARENT_SCOPE) diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt new file mode 100644 index 0000000..fa07534 --- /dev/null +++ b/src/core/CMakeLists.txt @@ -0,0 +1,6 @@ +set( + SRC_FILES ${SRC_FILES} +) + +set(SRC_FILES ${SRC_FILES} PARENT_SCOPE) + diff --git a/src/core/index_types.h b/src/core/index_types.h new file mode 100644 index 0000000..9131569 --- /dev/null +++ b/src/core/index_types.h @@ -0,0 +1,27 @@ +#ifndef _JH_CORE_INDEX_TYPES_H_ +#define _JH_CORE_INDEX_TYPES_H_ + +#include "../pervasive.h" + +/* + * JH_index is a replacement for size_t. As many indices are stored for every + * word learned, having control over which type of variable is used to represent + * those indices lets us scale the RAM usage. + */ + +#include <limits.h> +#include <stdint.h> + +/* Must be unsigned. */ +typedef unsigned int JH_index; + +/* Must be > 0. */ +#define JH_INDEX_MAX UINT_MAX + +#ifndef JH_RUNNING_FRAMA_C + #if (JH_INDEX_MAX > SIZE_MAX) + #error "JH_index should not be able to go higher than a size_t variable." + #endif +#endif + +#endif diff --git a/src/error/CMakeLists.txt b/src/error/CMakeLists.txt new file mode 100644 index 0000000..fa07534 --- /dev/null +++ b/src/error/CMakeLists.txt @@ -0,0 +1,6 @@ +set( + SRC_FILES ${SRC_FILES} +) + +set(SRC_FILES ${SRC_FILES} PARENT_SCOPE) + diff --git a/src/error/error.h b/src/error/error.h new file mode 100644 index 0000000..145c838 --- /dev/null +++ b/src/error/error.h @@ -0,0 +1,143 @@ +#ifndef _JH_ERROR_ERROR_H_ +#define _JH_ERROR_ERROR_H_ + +#include <stdio.h> + +#include "../pervasive.h" + +#ifndef JH_DEBUG_PROGRAM_FLOW + #define JH_DEBUG_PROGRAM_FLOW (0 || JH_DEBUG_ALL) +#endif + +#ifndef JH_DEBUG_CONFIG + #define JH_DEBUG_CONFIG (0 || JH_DEBUG_ALL) +#endif + +#ifndef JH_DEBUG_LEARNING + #define JH_DEBUG_LEARNING (0 || JH_DEBUG_ALL) +#endif + +#ifndef JH_DEBUG_NETWORK + #define JH_DEBUG_NETWORK 1 +#endif + +#ifndef JH_DEBUG_NETWORK + #define JH_DEBUG_NETWORK (0 || JH_DEBUG_ALL) +#endif + +#define JH_ENABLE_WARNINGS_OUTPUT 1 +#define JH_ENABLE_RUNTIME_ERRORS_OUTPUT 1 +#define JH_ENABLE_PROGRAMMING_ERRORS_OUTPUT 1 +#define JH_ENABLE_FATAL_ERROR_OUTPUT 1 + +#ifdef JH_ENABLE_ERROR_LOCATION + #define JH_LOCATION " [" __FILE__ "][" JH_TO_STRING(__LINE__) "]" +#else + #define JH_LOCATION "" +#endif + +#define JH_PRINT_STDERR(io, symbol, str, ...)\ + fprintf(io, "[" symbol "]" JH_LOCATION " " str "\n", __VA_ARGS__); + +/* + * Given that we use preprocessor contants as flags, we can expect the compilers + * to remove the test condition for disabled flags. No need to be shy about + * allowing many debug options. + */ + +#define JH_DEBUG(io, flag, str, ...)\ + JH_ISOLATE\ + (\ + if (flag)\ + {\ + JH_PRINT_STDERR(io, "D", str, __VA_ARGS__);\ + }\ + ) + + +#define JH_WARNING(io, str, ...)\ + JH_ISOLATE\ + (\ + if (JH_ENABLE_WARNINGS_OUTPUT)\ + {\ + JH_PRINT_STDERR(io, "W", str, __VA_ARGS__);\ + }\ + ) + +#define JH_ERROR(io, str, ...)\ + JH_ISOLATE\ + (\ + if (JH_ENABLE_RUNTIME_ERRORS_OUTPUT)\ + {\ + JH_PRINT_STDERR(io, "E", str, __VA_ARGS__);\ + }\ + ) + +#define JH_PROG_ERROR(io, str, ...)\ + JH_ISOLATE\ + (\ + if (JH_ENABLE_PROGRAMMING_ERRORS_OUTPUT)\ + {\ + JH_PRINT_STDERR(io, "P", str, __VA_ARGS__);\ + }\ + ) + +#define JH_FATAL(io, str, ...)\ + JH_ISOLATE\ + (\ + if (JH_ENABLE_FATAL_ERROR_OUTPUT)\ + {\ + JH_PRINT_STDERR(io, "F", str, __VA_ARGS__);\ + }\ + ) + +/* For outputs without dynamic content (static). ******************************/ + +#define JH_PRINT_S_STDERR(io, symbol, str)\ + fprintf(io, "[" symbol "]" JH_LOCATION " " str "\n"); + +#define JH_S_DEBUG(io, flag, str)\ + JH_ISOLATE\ + (\ + if (flag)\ + {\ + JH_PRINT_S_STDERR(io, "D", str);\ + }\ + ) + +#define JH_S_WARNING(io, str)\ + JH_ISOLATE\ + (\ + if (JH_ENABLE_WARNINGS_OUTPUT)\ + {\ + JH_PRINT_S_STDERR(io, "W", str);\ + }\ + ) + +#define JH_S_ERROR(io, str)\ + JH_ISOLATE\ + (\ + if (JH_ENABLE_RUNTIME_ERRORS_OUTPUT)\ + {\ + JH_PRINT_S_STDERR(io, "E", str);\ + }\ + ) + +#define JH_S_PROG_ERROR(io, str)\ + JH_ISOLATE\ + (\ + if (JH_ENABLE_PROGRAMMING_ERRORS_OUTPUT)\ + {\ + JH_PRINT_S_STDERR(io, "P", str);\ + }\ + ) + +#define JH_S_FATAL(io, str)\ + JH_ISOLATE\ + (\ + if (JH_ENABLE_FATAL_ERROR_OUTPUT)\ + {\ + JH_PRINT_S_STDERR(io, "F", str);\ + }\ + ) +#endif diff --git a/src/filter/CMakeLists.txt b/src/filter/CMakeLists.txt new file mode 100644 index 0000000..c8add44 --- /dev/null +++ b/src/filter/CMakeLists.txt @@ -0,0 +1,6 @@ +set( + SRC_FILES ${SRC_FILES} + ${CMAKE_CURRENT_SOURCE_DIR}/filter.c +) + +set(SRC_FILES ${SRC_FILES} PARENT_SCOPE) diff --git a/src/filter/filter.c b/src/filter/filter.c new file mode 100644 index 0000000..20adb79 --- /dev/null +++ b/src/filter/filter.c @@ -0,0 +1,465 @@ +#include <sys/socket.h> +#include <sys/un.h> + +#include <stdlib.h> +#include <errno.h> +#include <string.h> +#include <unistd.h> + +#include "../pervasive.h" +#include "../error/error.h" +#include "../parameters/parameters.h" + +#include "filter.h" + +static const char REQUEST_TEMPLATE[] = {'?', 'R', 'L', 'R', ' '}; + +static int connect_downstream +( + struct JH_limiter_filter filter [const restrict static 1], + const struct JH_parameters params [const restrict static 1] +) +{ + struct sockaddr_un addr; + + const int old_errno = errno; + + errno = 0; + + if ((filter->downstream_socket = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) + { + JH_FATAL + ( + stderr, + "Unable to create socket: %s.", + strerror(errno) + ); + + errno = old_errno; + + return -1; + } + + errno = old_errno; + + memset((void *) &addr, (int) 0, sizeof(addr)); + + addr.sun_family = AF_UNIX; + + strncpy + ( + (char *) addr.sun_path, + JH_parameters_get_dest_socket_name(params), + (sizeof(addr.sun_path) - ((size_t) 1)) + ); + + errno = 0; + + if + ( + connect + ( + filter->downstream_socket, + (struct sockaddr *) &addr, + sizeof(addr) + ) + == -1 + ) + { + JH_FATAL + ( + stderr, + "Unable to connect to address: %s.", + strerror(errno) + ); + + errno = old_errno; + + close(filter->downstream_socket); + + return -1; + } + + errno = old_errno; + + filter->state = JH_LIMITER_IS_LISTENING_UPSTREAM; + + return 0; +} + +static int listen_to_upstream +( + struct JH_limiter_filter filter [const restrict static 1], + const struct JH_parameters params [const restrict static 1] +) +{ + ssize_t io_bytes; + const int old_errno = errno; + + for (;;) + { + errno = 0; + io_bytes = + read + ( + filter->upstream_socket, + (void *) (filter->buffer + filter->buffer_index), + sizeof(char) + ); + + if (io_bytes == -1) + { + JH_ERROR + ( + stderr, + "Upstream read error %d (\"%s\").", + errno, + strerror(errno) + ); + + errno = old_errno; + + return -1; + } + + errno = old_errno; + + if + ( + filter->buffer[filter->buffer_index] + != REQUEST_TEMPLATE[filter->buffer_index] + ) + { + filter->buffer_index += 1; + filter->state = JH_LIMITER_IS_SENDING_DOWNSTREAM; + + return 0; + } + + filter->buffer_index += 1; + + if (filter->buffer_index == 5) + { + if (rand() >= JH_parameters_get_reply_rate(params)) + { + strncpy + ( + filter->buffer, + "?RL ", + (((size_t) 4) * sizeof(char)) + ); + + filter->buffer_index = 4; + } + + filter->state = JH_LIMITER_IS_SENDING_DOWNSTREAM; + + return 0; + } + } + + return -1; +} + +static int send_downstream +( + struct JH_limiter_filter filter [const restrict static 1] +) +{ + ssize_t io_bytes; + const int old_errno = errno; + + if (filter->buffer_index > 0) + { + errno = 0; + + io_bytes = + write + ( + filter->downstream_socket, + (void *) filter->buffer, + (((size_t) filter->buffer_index) * sizeof(char)) + ); + + filter->buffer_index = 0; + + if (io_bytes == -1) + { + JH_ERROR + ( + stderr, + "Downstream write error %d (\"%s\").", + errno, + strerror(errno) + ); + + errno = old_errno; + + return -1; + } + + errno = old_errno; + } + + for (;;) + { + errno = 0; + io_bytes = + read + ( + filter->upstream_socket, + (void *) filter->buffer, + sizeof(char) + ); + + if (io_bytes == -1) + { + JH_ERROR + ( + stderr, + "Upstream read error %d (\"%s\").", + errno, + strerror(errno) + ); + + errno = old_errno; + + return -1; + } + + errno = 0; + + io_bytes = write + ( + filter->downstream_socket, + (void *) filter->buffer, + sizeof(char) + ); + + if (io_bytes == -1) + { + JH_ERROR + ( + stderr, + "Upstream write error %d (\"%s\").", + errno, + strerror(errno) + ); + + errno = old_errno; + + return -1; + } + + errno = old_errno; + + if (filter->buffer[0] == '\n') + { + filter->state = JH_LIMITER_IS_SENDING_UPSTREAM; + + return 0; + } + } +} + +static int send_upstream +( + struct JH_limiter_filter filter [const restrict static 1] +) +{ + ssize_t io_bytes; + const int old_errno = errno; + + for (;;) + { + errno = 0; + + io_bytes = + read + ( + filter->downstream_socket, + (void *) filter->buffer, + sizeof(char) + ); + + if (io_bytes == -1) + { + JH_ERROR + ( + stderr, + "Downstream read error %d (\"%s\").", + errno, + strerror(errno) + ); + + errno = old_errno; + + return -1; + } + + errno = 0; + + io_bytes = write + ( + filter->upstream_socket, + (void *) filter->buffer, + sizeof(char) + ); + + if (io_bytes == -1) + { + JH_ERROR + ( + stderr, + "Upstream write error %d (\"%s\").", + errno, + strerror(errno) + ); + + errno = old_errno; + + return -1; + } + + errno = old_errno; + + switch (filter->buffer_index) + { + case -1: + if (filter->buffer[0] == '\n') + { + filter->buffer_index = 0; + } + break; + + case 0: + if (filter->buffer[0] == '!') + { + filter->buffer_index = 1; + } + else + { + filter->buffer_index = -1; + } + break; + + case 1: + if ((filter->buffer[0] == 'N') || (filter->buffer[0] == 'P')) + { + filter->buffer_index = 2; + } + else + { + filter->buffer_index = -1; + } + break; + + case 2: + if (filter->buffer[0] == ' ') + { + filter->buffer_index = 3; + } + else + { + filter->buffer_index = -1; + } + break; + + case 3: + if (filter->buffer[0] == '\n') + { + filter->buffer_index = 0; + filter->state = JH_LIMITER_IS_LISTENING_UPSTREAM; + + return 0; + } + else + { + filter->buffer_index = -1; + } + break; + + default: + JH_PROG_ERROR + ( + stderr, + "Invalid value for 'filter->buffer_index': %d.", + filter->buffer_index + ); + + filter->buffer_index = 0; + + return -1; + } + } + + return -1; +} + +/******************************************************************************/ +/** EXPORTED ******************************************************************/ +/******************************************************************************/ + +int JH_limiter_filter_step +( + struct JH_limiter_filter filter [const restrict static 1], + const struct JH_parameters params [const restrict static 1] +) +{ + switch (filter->state) + { + case JH_LIMITER_IS_CONNECTING: + JH_DEBUG(stderr, 1, "<CONNECTING> (index: %d)", filter->buffer_index); + return connect_downstream(filter, params); + + case JH_LIMITER_IS_LISTENING_UPSTREAM: + JH_DEBUG(stderr, 1, "<LISTENING_UP> (index: %d)", filter->buffer_index); + return listen_to_upstream(filter, params); + + case JH_LIMITER_IS_SENDING_DOWNSTREAM: + JH_DEBUG(stderr, 1, "<SENDING_DOWN> (index: %d)", filter->buffer_index); + return send_downstream(filter); + + case JH_LIMITER_IS_SENDING_UPSTREAM: + JH_DEBUG(stderr, 1, "<SENDING_UP> (index: %d)", filter->buffer_index); + return send_upstream(filter); + + default: + return -1; + } +} + +int JH_limiter_filter_initialize +( + struct JH_limiter_filter filter [const restrict static 1], + const int upstream_socket +) +{ + filter->state = JH_LIMITER_IS_CONNECTING; + filter->buffer_index = 0; + filter->upstream_socket = upstream_socket; + filter->downstream_socket = -1; + + return 0; +} + +void JH_limiter_filter_finalize +( + struct JH_limiter_filter filter [const restrict static 1] +) +{ + + if (filter->upstream_socket != -1) + { + close(filter->upstream_socket); + + filter->upstream_socket = -1; + } + + if (filter->downstream_socket != -1) + { + close(filter->downstream_socket); + + filter->downstream_socket = -1; + } +} diff --git a/src/filter/filter.h b/src/filter/filter.h new file mode 100644 index 0000000..5c21791 --- /dev/null +++ b/src/filter/filter.h @@ -0,0 +1,25 @@ +#ifndef _JH_LIMITER_FILTER_H_ +#define _JH_LIMITER_FILTER_H_ + +#include "../parameters/parameters_types.h" + +#include "filter_types.h" + +int JH_limiter_filter_initialize +( + struct JH_limiter_filter filter [const restrict static 1], + const int upstream_socket +); + +int JH_limiter_filter_step +( + struct JH_limiter_filter filter [const restrict static 1], + const struct JH_parameters params [const restrict static 1] +); + +void JH_limiter_filter_finalize +( + struct JH_limiter_filter filter [const restrict static 1] +); + +#endif diff --git a/src/filter/filter_types.h b/src/filter/filter_types.h new file mode 100644 index 0000000..b2c5ecb --- /dev/null +++ b/src/filter/filter_types.h @@ -0,0 +1,22 @@ +#ifndef _JH_LIMITER_FILTER_TYPES_H_ +#define _JH_LIMITER_FILTER_TYPES_H_ + +#define JH_LIMITER_FILTER_BUFFER_SIZE 5 +enum JH_limiter_filter_state +{ + JH_LIMITER_IS_CONNECTING, + JH_LIMITER_IS_LISTENING_UPSTREAM, + JH_LIMITER_IS_SENDING_DOWNSTREAM, + JH_LIMITER_IS_SENDING_UPSTREAM +}; + +struct JH_limiter_filter +{ + enum JH_limiter_filter_state state; + char buffer[JH_LIMITER_FILTER_BUFFER_SIZE]; + int buffer_index; + int upstream_socket; + int downstream_socket; +}; + +#endif diff --git a/src/main.c b/src/main.c new file mode 100644 index 0000000..9216104 --- /dev/null +++ b/src/main.c @@ -0,0 +1,54 @@ +#include <sys/socket.h> +#include <sys/un.h> + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <errno.h> +#include <unistd.h> + +#include "error/error.h" +#include "parameters/parameters.h" +#include "server/server.h" + +#include "pervasive.h" + +static void print_help (const char runnable [const restrict static 1]) +{ + printf + ( + "JabberHive - Limiter\n" + "Software Version %d\n" + "Protocol Version %d\n" + "\nUsages:\n" + " JH GATEWAY:\t%s SOCKET_NAME DESTINATION REPLY_RATE\n" + " SHOW HELP:\tAnything else.\n" + "\nParameters:\n" + " SOCKET_NAME:\tValid UNIX socket.\n" + " DESTINATION:\tValid UNIX socket.\n" + " REPLY_RATE:\tInteger [0,100].\n", + JH_PROGRAM_VERSION, + JH_PROTOCOL_VERSION, + runnable + ); +} + + +int main (int const argc, const char * argv [const static argc]) +{ + struct JH_parameters params; + + if (JH_parameters_initialize(¶ms, argc, argv) < 0) + { + print_help(argv[0]); + + return -1; + } + + if (JH_server_main(¶ms) < 0) + { + return -1; + } + + return 0; +} diff --git a/src/parameters/CMakeLists.txt b/src/parameters/CMakeLists.txt new file mode 100644 index 0000000..2aa7ece --- /dev/null +++ b/src/parameters/CMakeLists.txt @@ -0,0 +1,7 @@ +set( + SRC_FILES ${SRC_FILES} + ${CMAKE_CURRENT_SOURCE_DIR}/parameters.c + ${CMAKE_CURRENT_SOURCE_DIR}/parameters_getters.c +) +set(SRC_FILES ${SRC_FILES} PARENT_SCOPE) + diff --git a/src/parameters/parameters.c b/src/parameters/parameters.c new file mode 100644 index 0000000..b856c3e --- /dev/null +++ b/src/parameters/parameters.c @@ -0,0 +1,145 @@ +#include <stdio.h> +#include <stdlib.h> +#include <errno.h> +#include <limits.h> + +#include "../error/error.h" + +#include "parameters.h" + +static int parse_reply_rate +( + struct JH_parameters param [const restrict static 1], + const char argv [const restrict] +) +{ + long long int input; + const int old_errno = errno; + + errno = 0; + + input = strtoll(argv, (char **) NULL, 10); + + if + ( + (errno != 0) + || (input > (long long int) 100) + || (input < 0) + ) + { + JH_FATAL + ( + stderr, + "Invalid or value for parameter 'reply_rate', accepted " + "range is " + "[0, %hu] (integer).", + 100 + ); + + errno = old_errno; + + return -1; + } + + param->reply_rate = + (int) + ( + ((double) RAND_MAX) + * (((double) input) * ((double) 0.01)) + ); + + errno = old_errno; + + return 0; +} + +static void set_default_to_all_fields +( + struct JH_parameters param [const restrict static 1] +) +{ + param->reply_rate = -1; + param->socket_name = (const char *) NULL; + param->dest_socket_name = (const char *) NULL; +} + +static int is_valid +( + struct JH_parameters param [const restrict static 1] +) +{ + int valid; + + valid = 1; + + if (param->socket_name == (const char *) NULL) + { + JH_S_FATAL(stderr, "Missing parameter: This entity's socket name."); + + valid = 0; + } + + if (param->dest_socket_name == (const char *) NULL) + { + JH_S_FATAL(stderr, "Missing parameter: The destination's socket name."); + + valid = 0; + } + + if (param->reply_rate == -1) + { + JH_S_FATAL(stderr, "Missing parameter: The reply rate."); + + valid = 0; + } + + return valid; +} + +static void set_parameters +( + struct JH_parameters param [const restrict static 1], + int const argc, + const char * argv [const static argc] +) +{ + if (argc < 2) + { + return; + } + + param->socket_name = argv[1]; + + if (argc < 3) + { + return; + } + + param->dest_socket_name = argv[2]; + + if (argc < 4) + { + return; + } + + parse_reply_rate(param, argv[3]); +} + +int JH_parameters_initialize +( + struct JH_parameters param [const restrict static 1], + int const argc, + const char * argv [const static argc] +) +{ + set_default_to_all_fields(param); + + set_parameters(param, argc, argv); + + if (!is_valid(param)) + { + return -1; + } + + return 0; +} diff --git a/src/parameters/parameters.h b/src/parameters/parameters.h new file mode 100644 index 0000000..107350e --- /dev/null +++ b/src/parameters/parameters.h @@ -0,0 +1,28 @@ +#ifndef _JH_CLI_PARAMETERS_H_ +#define _JH_CLI_PARAMETERS_H_ + +#include "parameters_types.h" + +int JH_parameters_initialize +( + struct JH_parameters param [const restrict static 1], + int const argc, + const char * argv [const static argc] +); + +const int JH_parameters_get_reply_rate +( + const struct JH_parameters param [const restrict static 1] +); + +const char * JH_parameters_get_socket_name +( + const struct JH_parameters param [const restrict static 1] +); + +const char * JH_parameters_get_dest_socket_name +( + const struct JH_parameters param [const restrict static 1] +); + +#endif diff --git a/src/parameters/parameters_getters.c b/src/parameters/parameters_getters.c new file mode 100644 index 0000000..0b3ec9e --- /dev/null +++ b/src/parameters/parameters_getters.c @@ -0,0 +1,25 @@ +#include "parameters.h" + +const int JH_parameters_get_reply_rate +( + const struct JH_parameters param [const restrict static 1] +) +{ + return param->reply_rate; +} + +const char * JH_parameters_get_socket_name +( + const struct JH_parameters param [const restrict static 1] +) +{ + return param->socket_name; +} + +const char * JH_parameters_get_dest_socket_name +( + const struct JH_parameters param [const restrict static 1] +) +{ + return param->dest_socket_name; +} diff --git a/src/parameters/parameters_types.h b/src/parameters/parameters_types.h new file mode 100644 index 0000000..6584b3e --- /dev/null +++ b/src/parameters/parameters_types.h @@ -0,0 +1,15 @@ +#ifndef _JH_CLI_PARAMETERS_TYPES_H_ +#define _JH_CLI_PARAMETERS_TYPES_H_ + +#define JH_PARAMETERS_COUNT 3 + +struct JH_parameters +{ + int reply_rate; + + /* JH **********************************************************************/ + const char * restrict socket_name; + const char * restrict dest_socket_name; +}; + +#endif diff --git a/src/pervasive.h b/src/pervasive.h new file mode 100644 index 0000000..27d832d --- /dev/null +++ b/src/pervasive.h @@ -0,0 +1,28 @@ +#ifndef _JH_PERVASIVE_H_ +#define _JH_PERVASIVE_H_ + +#include <string.h> + +#define JH_PROGRAM_VERSION 1 +#define JH_PROTOCOL_VERSION 1 + +#ifdef __FRAMA_C__ + #define JH_RUNNING_FRAMA_C 1 +#endif + +#define JH_DEBUG_ALL 1 + +#ifndef JH_DEBUG_ALL + #define JH_DEBUG_ALL 0 +#endif + +#define JH__TO_STRING(x) #x +#define JH_TO_STRING(x) JH__TO_STRING(x) +#define JH_ISOLATE(a) do {a} while (0) + +/* strncmp stops at '\0' and strlen does not count '\0'. */ +#define JH_IS_PREFIX(a, b) (strncmp(a, b, strlen(a)) == 0) + +#define JH_STRING_EQUALS(a, b) (strcmp(a, b) == 0) + +#endif diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt new file mode 100644 index 0000000..9ade789 --- /dev/null +++ b/src/server/CMakeLists.txt @@ -0,0 +1,15 @@ +set( + SRC_FILES ${SRC_FILES} + ${CMAKE_CURRENT_SOURCE_DIR}/server.c + ${CMAKE_CURRENT_SOURCE_DIR}/server_create_socket.c + ${CMAKE_CURRENT_SOURCE_DIR}/server_finalize.c + ${CMAKE_CURRENT_SOURCE_DIR}/server_initialize.c + ${CMAKE_CURRENT_SOURCE_DIR}/server_joining_threads.c + ${CMAKE_CURRENT_SOURCE_DIR}/server_new_connection.c + ${CMAKE_CURRENT_SOURCE_DIR}/server_signal.c + ${CMAKE_CURRENT_SOURCE_DIR}/server_wait_for_event.c + ${CMAKE_CURRENT_SOURCE_DIR}/server_worker.c +) + +set(SRC_FILES ${SRC_FILES} PARENT_SCOPE) + diff --git a/src/server/server.c b/src/server/server.c new file mode 100644 index 0000000..640584f --- /dev/null +++ b/src/server/server.c @@ -0,0 +1,101 @@ +#include <signal.h> +#include <string.h> +#include <stdio.h> + +#include "../parameters/parameters.h" + +#include "server.h" + +int JH_server_main +( + const struct JH_parameters params [const restrict static 1] +) +{ + struct JH_server server; + JH_index retries; + + retries = 0; + /* TODO + if (JH_server_set_signal_handlers < 0) + { + return -1; + } + */ + + if (JH_server_initialize(&server, params) < 0) + { + return -1; + } + + while (JH_server_is_running()) + { + switch (JH_server_wait_for_event(&server)) + { + case 0: /* Timed out or signal'd. */ + JH_S_DEBUG(stderr, 1, "Timed out..."); + JH_server_handle_joining_threads(&server); + + retries = 0; + + break; + + case 1: /* New client attempted connection. */ + JH_S_DEBUG(stderr, 1, "New connection."); + JH_server_handle_joining_threads(&server); + (void) JH_server_handle_new_connection(&server); + + retries = 0; + + break; + + case -1: /* Something bad happened. */ + retries += 1; + + if (retries == JH_SERVER_MAX_RETRIES) + { + JH_server_finalize(&server); + + return -1; + } + + break; + + default: + JH_S_PROG_ERROR + ( + stderr, + "Unexpected wait_for_event return value." + ); + + break; + } + } + + /* Waiting for the threads to join... */ + while (server.workers.currently_running > 0) + { + switch (JH_server_wait_for_event(&server)) + { + case 0: /* Timed out. */ + case 1: /* New client attempted connection. */ + JH_server_handle_joining_threads(&server); + break; + + case -1: /* Something bad happened. */ + retries += 1; + + if (retries == JH_SERVER_MAX_RETRIES) + { + JH_server_finalize(&server); + + return -1; + } + break; + } + } + + JH_server_finalize(&server); + + return 0; +} + diff --git a/src/server/server.h b/src/server/server.h new file mode 100644 index 0000000..9d7f4b1 --- /dev/null +++ b/src/server/server.h @@ -0,0 +1,83 @@ +#ifndef _JH_SERVER_SERVER_H_ +#define _JH_SERVER_SERVER_H_ + +#include "../parameters/parameters_types.h" + +#include "server_types.h" + +int JH_server_initialize +( + struct JH_server server [const restrict static 1], + const struct JH_parameters params [const restrict static 1] +); + +int JH_server_socket_open +( + struct JH_server_socket server_socket [const restrict static 1], + const char socket_name [const restrict static 1] +); + +void JH_server_request_termination (void); +int JH_server_is_running (void); +int JH_server_set_signal_handlers (void); + +int JH_server_main +( + const struct JH_parameters params [const restrict static 1] +); + +void JH_server_finalize (struct JH_server [const restrict static 1]); + +int JH_server_wait_for_event +( + struct JH_server server [const restrict static 1] +); + +void JH_server_handle_joining_threads +( + struct JH_server server [const restrict static 1] +); + +int JH_server_handle_new_connection +( + struct JH_server server [const restrict static 1] +); + +void * JH_server_worker_main (void * input); + +int JH_server_worker_receive +( + struct JH_server_worker worker [const restrict static 1] +); + +int JH_server_worker_handle_request +( + struct JH_server_worker worker [const restrict static 1] +); + +int JH_server_worker_send_confirm_pipelining_support +( + struct JH_server_worker worker [const restrict static 1] +); + +int JH_server_worker_send_confirm_protocol_version +( + struct JH_server_worker worker [const restrict static 1] +); + +int JH_server_worker_send_positive +( + struct JH_server_worker worker [const restrict static 1] +); + +int JH_server_worker_send_negative +( + struct JH_server_worker worker [const restrict static 1] +); + +int JH_server_worker_send_generated_reply +( + struct JH_server_worker worker [const restrict static 1] +); + +#endif diff --git a/src/server/server_create_socket.c b/src/server/server_create_socket.c new file mode 100644 index 0000000..5e0c00b --- /dev/null +++ b/src/server/server_create_socket.c @@ -0,0 +1,195 @@ +#include <errno.h> +#include <fcntl.h> +#include <stdio.h> +#include <string.h> +#include <unistd.h> + +#include <sys/socket.h> +#include <sys/un.h> + +#include "../error/error.h" + +#include "server.h" + +static int create_socket (int result [const restrict static 1]) +{ + const int old_errno = errno; + + errno = 0; + *result = socket(AF_UNIX, SOCK_STREAM, 0); + + if (*result == -1) + { + JH_FATAL + ( + stderr, + "Unable to create server socket: %s.", + strerror(errno) + ); + + errno = old_errno; + + return -1; + } + + errno = old_errno; + + return 0; +} + +static int bind_socket +( + const int socket, + const char socket_name [const restrict static 1] +) +{ + struct sockaddr_un addr; + const int old_errno = errno; + + errno = 0; + memset(&addr, 0, sizeof(struct sockaddr_un)); + + addr.sun_family = AF_UNIX; + + strncpy + ( + (void *) addr.sun_path, + (const void *) socket_name, + (sizeof(addr.sun_path) - 1) + ); + + errno = old_errno; + + if + ( + bind + ( + socket, + (const struct sockaddr *) &addr, + (socklen_t) sizeof(struct sockaddr_un) + ) != 0 + ) + { + JH_FATAL + ( + stderr, + "Unable to bind server socket to %s: %s.", + socket_name, + strerror(errno) + ); + + errno = old_errno; + + return -1; + } + + errno = old_errno; + + return 0; +} + +static int set_socket_to_unblocking (const int socket) +{ + int current_flags; + const int old_errno = errno; + + current_flags = fcntl(socket, F_GETFD); + + if (current_flags == -1) + { + JH_FATAL + ( + stderr, + "Unable to get server socket properties: %s.", + strerror(errno) + ); + + errno = old_errno; + + return -1; + } + + /* current_flags = current_flags & (~O_NONBLOCK); */ + + current_flags = fcntl(socket, F_SETFD, (current_flags | O_NONBLOCK)); + + if (current_flags == -1) + { + JH_FATAL + ( + stderr, + "Unable to set server socket properties: %s.", + strerror(errno) + ); + + errno = old_errno; + + return -2; + } + + errno = old_errno; + + return 0; +} + +static int set_socket_as_listener (const int socket) +{ + const int old_errno = errno; + + if (listen(socket, JH_SERVER_SOCKET_LISTEN_BACKLOG) != 0) + { + JH_FATAL + ( + stderr, + "Unable to set server socket properties: %s.", + strerror(errno) + ); + + errno = old_errno; + + return -1; + } + + errno = old_errno; + + return 0; +} + +int JH_server_socket_open +( + struct JH_server_socket server_socket [const restrict static 1], + const char socket_name [const restrict static 1] +) +{ + printf("\"%s\"\n", socket_name); + if (create_socket(&(server_socket->file_descriptor)) < 0) + { + return -1; + } + + if (bind_socket(server_socket->file_descriptor, socket_name) < 0) + { + close(server_socket->file_descriptor); + + return -1; + } + + if (set_socket_to_unblocking(server_socket->file_descriptor) < 0) + { + close(server_socket->file_descriptor); + + return -1; + } + + if (set_socket_as_listener(server_socket->file_descriptor) < 0) + { + close(server_socket->file_descriptor); + + return -1; + } + + FD_ZERO(&(server_socket->as_a_set)); + FD_SET(server_socket->file_descriptor, &(server_socket->as_a_set)); + + return 0; +} diff --git a/src/server/server_finalize.c b/src/server/server_finalize.c new file mode 100644 index 0000000..25ea672 --- /dev/null +++ b/src/server/server_finalize.c @@ -0,0 +1,42 @@ +#include <stdlib.h> +#include <unistd.h> + +#include "../parameters/parameters.h" + +#include "server.h" + +static void finalize_thread_collection +( + struct JH_server_thread_collection workers [const restrict static 1] +) +{ + free((void *) workers->threads); + + workers->threads_capacity = 0; + + pthread_mutex_destroy(&(workers->mutex)); + pthread_barrier_destroy(&(workers->barrier)); + + workers->currently_running = 0; +} + +static void finalize_socket +( + struct JH_server_socket socket [const restrict static 1] +) +{ + FD_ZERO(&(socket->as_a_set)); + + close(socket->file_descriptor); + + socket->file_descriptor = -1; +} + +void JH_server_finalize +( + struct JH_server server [const restrict static 1] +) +{ + finalize_thread_collection(&(server->workers)); + finalize_socket(&(server->socket)); +} diff --git a/src/server/server_initialize.c b/src/server/server_initialize.c new file mode 100644 index 0000000..a125046 --- /dev/null +++ b/src/server/server_initialize.c @@ -0,0 +1,99 @@ +#include <signal.h> +#include <string.h> +#include <stdio.h> + +#include "../parameters/parameters.h" + +#include "server.h" + +static int initialize_worker_collection +( + struct JH_server_thread_collection c [const restrict static 1] +) +{ + int error; + + c->threads = (struct JH_server_thread_data *) NULL; + c->threads_capacity = 0; + c->currently_running = 0; + + error = + pthread_mutex_init + ( + &(c->mutex), + (const pthread_mutexattr_t *) NULL + ); + + if (error != 0) + { + JH_FATAL + ( + stderr, + "Unable to initialize worker collection's mutex: %s.", + strerror(error) + ); + + return -1; + } + + error = + pthread_barrier_init + ( + &(c->barrier), + (const pthread_barrierattr_t *) NULL, + 2 + ); + + if (error != 0) + { + JH_FATAL + ( + stderr, + "[F] Unable to initialize worker collection's barrier: %s.", + strerror(error) + ); + + return -1; + } + + return 0; +} + +void initialize_thread_parameters +( + struct JH_server server [const restrict static 1], + const struct JH_parameters params [const restrict static 1] +) +{ + server->thread_params.thread_collection = &(server->workers); + server->thread_params.server_params = params; + server->thread_params.socket = -1; +} + +int JH_server_initialize +( + struct JH_server server [const restrict static 1], + const struct JH_parameters params [const restrict static 1] +) +{ + if (initialize_worker_collection(&(server->workers)) < 0) + { + return -1; + } + + if + ( + JH_server_socket_open + ( + &(server->socket), + JH_parameters_get_socket_name(params) + ) < 0 + ) + { + return -2; + } + + initialize_thread_parameters(server, params); + + return 0; +} diff --git a/src/server/server_joining_threads.c b/src/server/server_joining_threads.c new file mode 100644 index 0000000..e1d92ca --- /dev/null +++ b/src/server/server_joining_threads.c @@ -0,0 +1,38 @@ +#include <sys/socket.h> + +#include <errno.h> +#include <string.h> +#include <stdio.h> +#include <stdlib.h> +#include <stdint.h> +#include <unistd.h> + +#include "../parameters/parameters.h" + +#include "server.h" + +void JH_server_handle_joining_threads +( + struct JH_server server [const restrict static 1] +) +{ + JH_index i; + + pthread_mutex_lock(&(server->workers.mutex)); + + for (i = 0; i < server->workers.threads_capacity; ++i) + { + if (server->workers.threads[i].state == JH_SERVER_JOINING_THREAD) + { + JH_DEBUG(stderr, 1, "Joining thread %u", i); + + pthread_join(server->workers.threads[i].posix_id, (void **) NULL); + + server->workers.threads[i].state = JH_SERVER_NO_THREAD; + + server->workers.currently_running -= 1; + } + } + + pthread_mutex_unlock(&(server->workers.mutex)); +} diff --git a/src/server/server_new_connection.c b/src/server/server_new_connection.c new file mode 100644 index 0000000..23a2770 --- /dev/null +++ b/src/server/server_new_connection.c @@ -0,0 +1,180 @@ +#include <sys/socket.h> + +#include <errno.h> +#include <string.h> +#include <stdio.h> +#include <stdlib.h> +#include <stdint.h> +#include <unistd.h> + +#include "../parameters/parameters.h" + +#include "server.h" + +static int get_new_socket (struct JH_server server [const restrict static 1]) +{ + const int old_errno = errno; + + server->thread_params.socket = + accept + ( + server->socket.file_descriptor, + (struct sockaddr *) NULL, + (socklen_t *) NULL + ); + + if (server->thread_params.socket == -1) + { + JH_ERROR + ( + stderr, + "Unable to accept on the server's socket: %s.", + strerror(errno) + ); + + errno = old_errno; + + return -1; + } + + errno = old_errno; + + return 0; +} + +static int get_new_thread (struct JH_server server [const restrict static 1]) +{ + struct JH_server_thread_data * new_threads; + JH_index i; + + pthread_mutex_lock(&(server->workers.mutex)); + + for (i = 0; i < server->workers.threads_capacity; ++i) + { + if (server->workers.threads[i].state == JH_SERVER_NO_THREAD) + { + server->thread_params.thread_id = i; + + pthread_mutex_unlock(&(server->workers.mutex)); + + return 0; + } + } + + if + ( + (server->workers.threads_capacity == JH_INDEX_MAX) + || + ( + (size_t) (server->workers.threads_capacity + 1) + > (SIZE_MAX / sizeof(struct JH_server_thread_data)) + ) + ) + { + JH_S_ERROR + ( + stderr, + "Maximum number of concurrent threads attained, unable to add more." + ); + + pthread_mutex_unlock(&(server->workers.mutex)); + + return -1; + } + + server->thread_params.thread_id = server->workers.threads_capacity; + server->workers.threads_capacity += 1; + + new_threads = + (struct JH_server_thread_data *) realloc + ( + server->workers.threads, + ( + sizeof(struct JH_server_thread_data) + * ((size_t) server->workers.threads_capacity) + ) + ); + + if (new_threads == ((struct JH_server_thread_data *) NULL)) + { + JH_S_ERROR + ( + stderr, + "Reallocation of the threads' data list failed." + ); + + pthread_mutex_unlock(&(server->workers.mutex)); + + return -1; + } + + server->workers.threads = new_threads; + + pthread_mutex_unlock(&(server->workers.mutex)); + + return 0; +} + +static int spawn_thread (struct JH_server server [const restrict static 1]) +{ + const JH_index thread_id = server->thread_params.thread_id; + int error; + + server->workers.threads[thread_id].state = JH_SERVER_RUNNING_THREAD; + + error = + pthread_create + ( + &(server->workers.threads[thread_id].posix_id), + (const pthread_attr_t *) NULL, + JH_server_worker_main, + (void *) &(server->thread_params) + ); + + if (error != 0) + { + JH_ERROR + ( + stderr, + "Unable to spawn thread: %s.", + strerror(error) + ); + + server->workers.threads[thread_id].state = JH_SERVER_NO_THREAD; + + return -1; + } + + pthread_barrier_wait(&(server->workers.barrier)); + + server->workers.currently_running += 1; + + return 0; +} + +int JH_server_handle_new_connection +( + struct JH_server server [const restrict static 1] +) +{ + if (get_new_socket(server) < 0) + { + return -1; + } + + if (get_new_thread(server) < 0) + { + close(server->thread_params.socket); + + return -2; + } + + if (spawn_thread(server) < 0) + { + close(server->thread_params.socket); + + return -3; + } + + return 0; +} diff --git a/src/server/server_signal.c b/src/server/server_signal.c new file mode 100644 index 0000000..9361382 --- /dev/null +++ b/src/server/server_signal.c @@ -0,0 +1,41 @@ +#include <signal.h> +#include <string.h> +#include <stdio.h> + +#include "server.h" + +static volatile char JH_SERVER_IS_RUNNING = (char) 1; + +static void request_termination (int const signo) +{ + if ((signo == SIGINT) || (signo == SIGTERM)) + { + JH_server_request_termination(); + } +} + +void JH_server_request_termination (void) +{ + JH_SERVER_IS_RUNNING = (char) 0; +} + +int JH_server_is_running (void) +{ + return (int) JH_SERVER_IS_RUNNING; +} + +int JH_server_set_signal_handlers (void) +{ + /* + struct sigaction act; + + act.sa_handler = request_termination; + act.sa_mask = + act.sa_flags = + act.sa_restorer = + */ + + /* TODO */ + + return -1; +} diff --git a/src/server/server_types.h b/src/server/server_types.h new file mode 100644 index 0000000..f43e47d --- /dev/null +++ b/src/server/server_types.h @@ -0,0 +1,75 @@ +#ifndef _JH_SERVER_SERVER_TYPES_H_ +#define _JH_SERVER_SERVER_TYPES_H_ + +#include <sys/time.h> + +#ifndef JH_RUNNING_FRAMA_C + #include <pthread.h> +#endif + +#include "../core/index_types.h" + +#include "../parameters/parameters_types.h" + +#include "../error/error.h" + +#define JH_SERVER_MAX_RETRIES 10 +#define JH_SERVER_BUFFER_SIZE 0 + +#define JH_SERVER_SOCKET_ACCEPT_TIMEOUT_SEC 5 +#define JH_SERVER_SOCKET_LISTEN_BACKLOG 5 + +enum JH_server_thread_state +{ + JH_SERVER_JOINING_THREAD, + JH_SERVER_RUNNING_THREAD, + JH_SERVER_NO_THREAD +}; + +struct JH_server_thread_data +{ +#ifndef JH_RUNNING_FRAMA_C + pthread_t posix_id; +#endif + enum JH_server_thread_state state; +}; + +struct JH_server_thread_collection +{ + struct JH_server_thread_data * threads; + JH_index threads_capacity; +#ifndef JH_RUNNING_FRAMA_C + pthread_mutex_t mutex; + pthread_barrier_t barrier; +#endif + JH_index currently_running; +}; + +struct JH_server_socket +{ + int file_descriptor; + fd_set as_a_set; + struct timeval timeout; +}; + +struct JH_server_thread_parameters +{ + struct JH_server_thread_collection * thread_collection; + const struct JH_parameters * server_params; + JH_index thread_id; + int socket; +}; + +struct JH_server_worker +{ + struct JH_server_thread_parameters params; +}; + +struct JH_server +{ + struct JH_server_thread_collection workers; + struct JH_server_socket socket; + struct JH_server_thread_parameters thread_params; +}; + +#endif diff --git a/src/server/server_wait_for_event.c b/src/server/server_wait_for_event.c new file mode 100644 index 0000000..c949438 --- /dev/null +++ b/src/server/server_wait_for_event.c @@ -0,0 +1,62 @@ +#include <sys/select.h> + +#include <errno.h> +#include <stdio.h> +#include <string.h> + +#include "../error/error.h" + +#include "server.h" + +int JH_server_wait_for_event +( + struct JH_server server [const static 1] +) +{ + int ready_fds; + const int old_errno = errno; + fd_set ready_to_read; + + ready_to_read = server->socket.as_a_set; + + /* call to select may alter timeout */ + memset((void *) &(server->socket.timeout), 0, sizeof(struct timeval)); + + server->socket.timeout.tv_sec = JH_SERVER_SOCKET_ACCEPT_TIMEOUT_SEC; + + errno = 0; + + ready_fds = select + ( + (server->socket.file_descriptor + 1), + &ready_to_read, + (fd_set *) NULL, + (fd_set *) NULL, + &(server->socket.timeout) + ); + + JH_DEBUG(stderr, 1, "SELECT returned: %i, errno is %i.", ready_fds, errno); + + if (errno == EINTR) + { + ready_fds = 0; + } + + if (ready_fds == -1) + { + JH_FATAL + ( + stderr, + "Unable to wait on server socket: %s.", + strerror(errno) + ); + + errno = old_errno; + + return -1; + } + + errno = old_errno; + + return ready_fds; +} diff --git a/src/server/server_worker.c b/src/server/server_worker.c new file mode 100644 index 0000000..12387a7 --- /dev/null +++ b/src/server/server_worker.c @@ -0,0 +1,73 @@ +#include <stdlib.h> +#include <unistd.h> +#include <string.h> +#include <stdio.h> +#include <errno.h> + +#include "../filter/filter.h" + +#include "server.h" + +static int initialize +( + struct JH_server_worker worker [const restrict static 1], + void * input +) +{ + const int old_errno = errno; + + memcpy + ( + (void *) &(worker->params), + (const void *) input, + sizeof(struct JH_server_thread_parameters) + ); + + pthread_barrier_wait(&(worker->params.thread_collection->barrier)); + + return 0; +} + +static void finalize +( + struct JH_server_worker worker [const restrict static 1] +) +{ + close(worker->params.socket); + + pthread_mutex_lock(&(worker->params.thread_collection->mutex)); + + worker->params.thread_collection->threads[worker->params.thread_id].state = + JH_SERVER_JOINING_THREAD; + + pthread_mutex_unlock(&(worker->params.thread_collection->mutex)); +} + +void * JH_server_worker_main (void * input) +{ + struct JH_limiter_filter filter; + struct JH_server_worker worker; + + initialize(&worker, input); + + if (JH_limiter_filter_initialize(&filter, worker.params.socket) < 0) + { + finalize(&worker); + + return NULL; + } + + while (JH_server_is_running()) + { + if (JH_limiter_filter_step(&filter, worker.params.server_params) < 0) + { + break; + } + } + + JH_limiter_filter_finalize(&filter); + + finalize(&worker); + + return NULL; +} |


