diff options
author | Michal Kubecek <mkubecek@suse.cz> | 2015-04-13 09:21:39 +0200 |
---|---|---|
committer | Michal Kubecek <mkubecek@suse.cz> | 2015-04-13 09:21:39 +0200 |
commit | e2bc6f4153813cc570ae814c8ddb74628009b488 (patch) | |
tree | a40b171be1d859c2232ccc94f758010f9ae54d3c /src/sockets/connection_table.cpp | |
download | twinkle-e2bc6f4153813cc570ae814c8ddb74628009b488.tar twinkle-e2bc6f4153813cc570ae814c8ddb74628009b488.tar.gz twinkle-e2bc6f4153813cc570ae814c8ddb74628009b488.tar.lz twinkle-e2bc6f4153813cc570ae814c8ddb74628009b488.tar.xz twinkle-e2bc6f4153813cc570ae814c8ddb74628009b488.zip |
initial checkin
Check in contents of upstream 1.4.2 tarball, exclude generated files.
Diffstat (limited to 'src/sockets/connection_table.cpp')
-rw-r--r-- | src/sockets/connection_table.cpp | 411 |
1 files changed, 411 insertions, 0 deletions
diff --git a/src/sockets/connection_table.cpp b/src/sockets/connection_table.cpp new file mode 100644 index 0000000..4211eb5 --- /dev/null +++ b/src/sockets/connection_table.cpp @@ -0,0 +1,411 @@ +/* + Copyright (C) 2005-2009 Michel de Boer <michel@twinklephone.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include "connection_table.h" + +#include <algorithm> +#include <sys/select.h> +#include <sys/types.h> +#include <sys/time.h> +#include <cerrno> +#include <iostream> +#include <cstdlib> +#include <fcntl.h> + +#include "log.h" +#include "protocol.h" +#include "util.h" +#include "audits/memman.h" + +using namespace std; + +extern t_connection_table *connection_table; + +void t_connection_table::create_pipe(int p[2]) { + if (pipe(p) == -1) { + string err = get_error_str(errno); + cerr << "FATAL: t_connection_table - Cannot create pipe.\n"; + cerr << err << endl; + exit(-1); + } + + if (fcntl(p[0], F_SETFL, O_NONBLOCK) == -1) { + string err = get_error_str(errno); + cerr << "FATAL: t_connection_table - fcntl fails on read side of pipe.\n"; + cerr << err << endl; + exit(-1); + } + + if (fcntl(p[1], F_SETFL, O_NONBLOCK) == -1) { + string err = get_error_str(errno); + cerr << "FATAL: t_connection_table - fcntl fails on write side of pipe.\n"; + cerr << err << endl; + exit(-1); + } +} + +void t_connection_table::signal_modification_read(void) { + t_mutex_guard guard(mtx_connections_); + + // Write a byte to the modified pipe, so a select can be retried. + char x = 'x'; + (void)write(fd_pipe_modified_read_[1], &x, 1); +} + +void t_connection_table::signal_modification_write(void) { + t_mutex_guard guard(mtx_connections_); + + // Write a byte to the modified pipe, so a select can be retried. + char x = 'x'; + (void)write(fd_pipe_modified_write_[1], &x, 1); +} + +void t_connection_table::signal_quit(void) { + t_mutex_guard guard(mtx_connections_); + + // Write a byte to the quit pipe, so a select can be halted. + char x = 'x'; + (void)write(fd_pipe_quit_read_[1], &x, 1); + (void)write(fd_pipe_quit_write_[1], &x, 1); + + terminated_ = true; +} + +t_recursive_mutex t_connection_table::mtx_connections_; + +t_connection_table::t_connection_table() : + terminated_(false) +{ + create_pipe(fd_pipe_modified_read_); + create_pipe(fd_pipe_modified_write_); + create_pipe(fd_pipe_quit_read_); + create_pipe(fd_pipe_quit_write_); +} + +t_connection_table::~t_connection_table() { + t_mutex_guard guard(mtx_connections_); + + for (list<t_connection *>::iterator it = connections_.begin(); + it != connections_.end(); ++it) + { + MEMMAN_DELETE(*it); + delete *it; + } +} + +void t_connection_table::unlock(void) const { + mtx_connections_.unlock(); +} + +bool t_connection_table::empty(void) const { + t_mutex_guard guard(mtx_connections_); + return connections_.empty(); +} + +t_connection_table::size_type t_connection_table::size(void) const { + t_mutex_guard guard(mtx_connections_); + return connections_.size(); +} + +void t_connection_table::add_connection(t_connection *connection) { + t_mutex_guard guard(mtx_connections_); + connections_.push_back(connection); + signal_modification_read(); + signal_modification_write(); +} + +void t_connection_table::remove_connection(t_connection *connection) { + t_mutex_guard guard(mtx_connections_); + connections_.remove(connection); + signal_modification_read(); + signal_modification_write(); +} + +t_connection *t_connection_table::get_connection(unsigned long remote_addr, + unsigned short remote_port) +{ + mtx_connections_.lock(); + + t_connection *found_connection = NULL; + list<t_connection *> broken_connections; + + for (list<t_connection *>::iterator it = connections_.begin(); + it != connections_.end(); ++it) + { + unsigned long addr; + unsigned short port; + + if ((*it)->may_reuse()) { + try { + t_socket *socket = (*it)->get_socket(); + t_socket_tcp *tcp_socket = dynamic_cast<t_socket_tcp *>(socket); + + if (tcp_socket) { + tcp_socket->get_remote_address(addr, port); + if (addr == remote_addr && port == remote_port) { + found_connection = *it; + break; + } + } + } catch (int err) { + // This should never happen. + cerr << "Cannot get remote address of socket." << endl; + + // Destroy and remove connection as it is probably broken. + broken_connections.push_back(*it); + } + } + } + + // Clear broken connections + for (list<t_connection *>::iterator it = broken_connections.begin(); + it != broken_connections.end(); ++it) + { + remove_connection(*it); + MEMMAN_DELETE(*it); + delete *it; + } + + if (!found_connection) mtx_connections_.unlock(); + return found_connection; +} + +list<t_connection *> t_connection_table::select_read(struct timeval *timeout) const { + fd_set read_fds; + int nfds = 0; + bool retry = true; + list<t_connection *> result; + + // Empty modification pipe + char pipe_buf; + while (read(fd_pipe_modified_read_[0], &pipe_buf, 1) > 0); + + while (retry) { + FD_ZERO(&read_fds); + + // Add modification pipe so select can be restarted when the + // connection table modifies. + FD_SET(fd_pipe_modified_read_[0], &read_fds); + nfds = fd_pipe_modified_read_[0]; + + // Add quit pipe so select can quit on demand. + FD_SET(fd_pipe_quit_read_[0], &read_fds); + nfds = max(nfds, fd_pipe_quit_read_[0]); + + mtx_connections_.lock(); + + for (list<t_connection *>::const_iterator it = connections_.begin(); + it != connections_.end(); ++it) + { + t_socket *socket = (*it)->get_socket(); + int fd = socket->get_descriptor(); + FD_SET(fd, &read_fds); + nfds = max(nfds, fd); + } + + mtx_connections_.unlock(); + + int ret = select(nfds + 1, &read_fds, NULL, NULL, timeout); + if (ret < 0) throw errno; + + if (FD_ISSET(fd_pipe_quit_read_[0], &read_fds)) { + // Quit was signalled, so stop immediately. + break; + } + + mtx_connections_.lock(); + + // Determine which sockets have become readable + for (list<t_connection *>::const_iterator it = connections_.begin(); + it != connections_.end(); ++it) + { + t_socket *socket = (*it)->get_socket(); + int fd = socket->get_descriptor(); + if (FD_ISSET(fd, &read_fds)) { + result.push_back(*it); + } + } + + if (!result.empty()) { + // Connections have become readable, so return to the caller. + retry = false; + } else { + mtx_connections_.unlock(); + + // No connections have become readable. Check signal descriptors + if (FD_ISSET(fd_pipe_modified_read_[0], &read_fds)) { + // The connection table is modified. Retry select. + read(fd_pipe_modified_read_[0], &pipe_buf, 1); + } else { + // This should never happen. + cerr << "ERROR: select_read returned without any file descriptor." << endl; + } + } + } + + return result; +} + +list<t_connection *> t_connection_table::select_write(struct timeval *timeout) const { + fd_set read_fds; + fd_set write_fds; + int nfds = 0; + bool retry = true; + list<t_connection *> result; + + // Empty modification pipe + char pipe_buf; + while (read(fd_pipe_modified_write_[0], &pipe_buf, 1) > 0); + + while (retry) { + FD_ZERO(&read_fds); + FD_ZERO(&write_fds); + + // Add modification pipe so select can be restarted when the + // connection table modifies. + FD_SET(fd_pipe_modified_write_[0], &read_fds); + nfds = fd_pipe_modified_write_[0]; + + // Add quit pipe so select can quit on demand. + FD_SET(fd_pipe_quit_write_[0], &read_fds); + nfds = max(nfds, fd_pipe_quit_write_[0]); + + mtx_connections_.lock(); + + for (list<t_connection *>::const_iterator it = connections_.begin(); + it != connections_.end(); ++it) + { + if ((*it)->has_data_to_send()) { + t_socket *socket = (*it)->get_socket(); + int fd = socket->get_descriptor(); + FD_SET(fd, &write_fds); + nfds = max(nfds, fd); + } + } + + mtx_connections_.unlock(); + + int ret = select(nfds + 1, &read_fds, &write_fds, NULL, timeout); + if (ret < 0) throw errno; + + if (FD_ISSET(fd_pipe_quit_write_[0], &read_fds)) { + // Quit was signalled, so stop immediately. + break; + } + + mtx_connections_.lock(); + + // Determine which sockets have become writable + for (list<t_connection *>::const_iterator it = connections_.begin(); + it != connections_.end(); ++it) + { + t_socket *socket = (*it)->get_socket(); + int fd = socket->get_descriptor(); + if (FD_ISSET(fd, &write_fds)) { + result.push_back(*it); + } + } + + if (!result.empty()) { + // Connections have become writable, so return to the caller. + retry = false; + } else { + mtx_connections_.unlock(); + + // No connections have become writable. Check signal descriptors + if (FD_ISSET(fd_pipe_modified_write_[0], &read_fds)) { + // The connection table is modified. Retry select. + read(fd_pipe_modified_write_[0], &pipe_buf, 1); + } else { + // This should never happen. + cerr << "ERROR: select_write returned without any file descriptor." << endl; + } + } + } + + return result; +} + +void t_connection_table::cancel_select(void) { + signal_quit(); +} + +void t_connection_table::restart_write_select(void) { + signal_modification_write(); +} + +void t_connection_table::close_idle_connections(unsigned long interval, bool &terminated) { + t_mutex_guard guard(mtx_connections_); + + terminated = terminated_; + + list<t_connection *> expired_connections; + + // Update idle times and find expired connections. + for (list<t_connection *>::iterator it = connections_.begin(); + it != connections_.end(); ++it) + { + unsigned long idle_time = (*it)->increment_idle_time(interval); + if (idle_time >= DUR_IDLE_CONNECTION || terminated) { + // If a registered URI is associated with the connection, then + // it is persistent and it should not be closed. + if (!(*it)->has_registered_uri()) { + expired_connections.push_back(*it); + } + } + } + + // Close expired connections. + for (list<t_connection *>::iterator it = expired_connections.begin(); + it != expired_connections.end(); ++it) + { + unsigned long ipaddr; + unsigned short port; + + (*it)->get_remote_address(ipaddr, port); + log_file->write_header("t_connection_table::close_idle_connections", LOG_NORMAL, LOG_DEBUG); + log_file->write_raw("Close connection to "); + log_file->write_raw(h_ip2str(ipaddr)); + log_file->write_raw(":"); + log_file->write_raw(int2str(port)); + log_file->write_endl(); + log_file->write_footer(); + + remove_connection(*it); + MEMMAN_DELETE(*it); + delete *it; + } +} + +void *connection_timeout_main(void *arg) { + bool terminated = false; + + while (!terminated) { + struct timespec sleep_timer; + + sleep_timer.tv_sec = 1; + sleep_timer.tv_nsec = 0; + nanosleep(&sleep_timer, NULL); + connection_table->close_idle_connections(1000, terminated); + } + + log_file->write_report("Connection timeout handler terminated.", + "::connection_timeout_main"); + + return NULL; +}; |