SocketManager.cpp

Go to the documentation of this file.
00001 /*
00002  * Copyright (C) 2006-2010 Jacek Sieka, arnetheduck on gmail point com
00003  *
00004  * This program is free software; you can redistribute it and/or modify
00005  * it under the terms of the GNU General Public License as published by
00006  * the Free Software Foundation; either version 2 of the License, or
00007  * (at your option) any later version.
00008  *
00009  * This program is distributed in the hope that it will be useful,
00010  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00011  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012  * GNU General Public License for more details.
00013  *
00014  * You should have received a copy of the GNU General Public License
00015  * along with this program; if not, write to the Free Software
00016  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
00017  */
00018 
00019 #include "adchpp.h"
00020 
00021 #include "SocketManager.h"
00022 
00023 #include "LogManager.h"
00024 #include "TimerManager.h"
00025 #include "ManagedSocket.h"
00026 #include "ServerInfo.h"
00027 #include "SimpleXML.h"
00028 
00029 #ifdef HAVE_OPENSSL
00030 #include <boost/asio/ssl.hpp>
00031 #endif
00032 #include <boost/date_time/posix_time/time_parsers.hpp>
00033 
00034 namespace adchpp {
00035 
00036 using namespace std;
00037 using namespace std::placeholders;
00038 using namespace boost::asio;
00039 using boost::system::error_code;
00040 using boost::system::system_error;
00041 
00042 SocketManager::SocketManager()  {
00043 
00044 }
00045 
00046 SocketManager::~SocketManager() {
00047 
00048 }
00049 
00050 SocketManager* SocketManager::instance = 0;
00051 const string SocketManager::className = "SocketManager";
00052 
00053 template<typename T>
00054 class SocketStream : public AsyncStream {
00055 public:
00056     template<typename X>
00057     SocketStream(X& x) : sock(x) { }
00058 
00059     template<typename X, typename Y>
00060     SocketStream(X& x, Y& y) : sock(x, y) {
00061     }
00062 
00063     virtual size_t available() {
00064         return sock.lowest_layer().available();
00065     }
00066 
00067     virtual void prepareRead(const BufferPtr& buf, const Handler& handler) {
00068         if(buf) {
00069             sock.async_read_some(boost::asio::buffer(buf->data(), buf->size()), handler);
00070         } else {
00071             sock.async_read_some(boost::asio::null_buffers(), handler);
00072         }
00073     }
00074 
00075     virtual size_t read(const BufferPtr& buf) {
00076         return sock.read_some(boost::asio::buffer(buf->data(), buf->size()));
00077     }
00078 
00079     virtual void write(const BufferList& bufs, const Handler& handler) {
00080         if(bufs.size() == 1) {
00081             sock.async_write_some(boost::asio::buffer(bufs[0]->data(), bufs[0]->size()), handler);
00082         } else {
00083             size_t n = std::min(bufs.size(), static_cast<size_t>(64));
00084             std::vector<boost::asio::const_buffer> buffers;
00085             buffers.reserve(n);
00086 
00087             const size_t maxBytes = 1024;
00088 
00089             for(size_t i = 0, total = 0; i < n && total < maxBytes; ++i) {
00090                 size_t left = maxBytes - total;
00091                 size_t bytes = min(bufs[i]->size(), left);
00092                 buffers.push_back(boost::asio::const_buffer(bufs[i]->data(), bytes));
00093                 total += bytes;
00094             }
00095 
00096             sock.async_write_some(buffers, handler);
00097         }
00098     }
00099 
00100     virtual void close() {
00101         // Abortive close, just go away...
00102         if(sock.lowest_layer().is_open()) {
00103             boost::system::error_code ec;
00104             sock.lowest_layer().set_option(socket_base::linger(true, 10), ec); // Ignore errors
00105             sock.lowest_layer().close(ec); // Ignore errors
00106         }
00107     }
00108 
00109     T sock;
00110 };
00111 
00112 typedef SocketStream<ip::tcp::socket> SimpleSocketStream;
00113 
00114 #ifdef HAVE_OPENSSL
00115 typedef SocketStream<ssl::stream<ip::tcp::socket> > TLSSocketStream;
00116 #endif
00117 
00118 // Default buffer size used for SO_RCVBUF/SO_SNDBUF
00119 // We don't need a large one since we generally deal with very short messages
00120 static const int SOCKET_BUFFER_SIZE = 1024;
00121 
00122 class SocketFactory : public enable_shared_from_this<SocketFactory> {
00123 public:
00124     SocketFactory(io_service& io_, const SocketManager::IncomingHandler& handler_, const ServerInfoPtr& info) :
00125         io(io_),
00126         acceptor(io_, ip::tcp::endpoint(boost::asio::ip::tcp::v4(), info->port)),
00127         serverInfo(info),
00128         handler(handler_)
00129     {
00130 #ifdef HAVE_OPENSSL
00131         if(info->secure()) {
00132             context = make_shared<boost::asio::ssl::context>(io, ssl::context::tlsv1_server);
00133             context->set_options(
00134                 boost::asio::ssl::context::no_sslv2
00135                 | boost::asio::ssl::context::no_sslv3
00136                 | boost::asio::ssl::context::single_dh_use);
00137             //context->set_password_callback(boost::bind(&server::get_password, this));
00138             context->use_certificate_chain_file(info->TLSParams.cert);
00139             context->use_private_key_file(info->TLSParams.pkey, boost::asio::ssl::context::pem);
00140             context->use_tmp_dh_file(info->TLSParams.dh);
00141         }
00142 #endif
00143     }
00144 
00145     void prepareAccept() {
00146         if(!SocketManager::getInstance()->work.get()) {
00147             return;
00148         }
00149 #ifdef HAVE_OPENSSL
00150         if(serverInfo->secure()) {
00151             shared_ptr<TLSSocketStream> s = make_shared<TLSSocketStream>(io, *context);
00152             ManagedSocketPtr socket = make_shared<ManagedSocket>(s);
00153             acceptor.async_accept(s->sock.lowest_layer(), std::bind(&SocketFactory::prepareHandshake, shared_from_this(), std::placeholders::_1, socket));
00154         } else {
00155 #endif
00156             shared_ptr<SimpleSocketStream> s = make_shared<SimpleSocketStream>(io);
00157             ManagedSocketPtr socket = make_shared<ManagedSocket>(s);
00158             acceptor.async_accept(s->sock.lowest_layer(), std::bind(&SocketFactory::handleAccept, shared_from_this(), std::placeholders::_1, socket));
00159 #ifdef HAVE_OPENSSL
00160         }
00161 #endif
00162     }
00163 
00164 #ifdef HAVE_OPENSSL
00165     void prepareHandshake(const error_code& ec, const ManagedSocketPtr& socket) {
00166         if(!ec) {
00167             TLSSocketStream* tls = static_cast<TLSSocketStream*>(socket->sock.get());
00168             // By default, we linger for 30 seconds (this will happen when the stream
00169             // is deallocated without calling close first)
00170             tls->sock.lowest_layer().set_option(socket_base::linger(true, 30));
00171             tls->sock.lowest_layer().set_option(boost::asio::socket_base::receive_buffer_size(SOCKET_BUFFER_SIZE));
00172             tls->sock.lowest_layer().set_option(boost::asio::socket_base::send_buffer_size(SOCKET_BUFFER_SIZE));
00173             try {
00174                 socket->setIp(tls->sock.lowest_layer().remote_endpoint().address().to_string());
00175             } catch(const system_error&) { }
00176             tls->sock.async_handshake(ssl::stream_base::server, std::bind(&SocketFactory::completeAccept, shared_from_this(), std::placeholders::_1, socket));
00177         }
00178 
00179         prepareAccept();
00180     }
00181 #endif
00182 
00183     void handleAccept(const error_code& ec, const ManagedSocketPtr& socket) {
00184         if(!ec) {
00185             shared_ptr<SimpleSocketStream> s = SHARED_PTR_NS::static_pointer_cast<SimpleSocketStream>(socket->sock);
00186             // By default, we linger for 30 seconds (this will happen when the stream
00187             // is deallocated without calling close first)
00188             s->sock.lowest_layer().set_option(socket_base::linger(true, 30));
00189             s->sock.lowest_layer().set_option(boost::asio::socket_base::receive_buffer_size(SOCKET_BUFFER_SIZE));
00190             s->sock.lowest_layer().set_option(boost::asio::socket_base::send_buffer_size(SOCKET_BUFFER_SIZE));
00191             try {
00192                 socket->setIp(s->sock.lowest_layer().remote_endpoint().address().to_string());
00193             } catch(const system_error&) { }
00194         }
00195 
00196         completeAccept(ec, socket);
00197 
00198         prepareAccept();
00199     }
00200 
00201     void completeAccept(const error_code& ec, const ManagedSocketPtr& socket) {
00202         handler(socket);
00203         socket->completeAccept(ec);
00204     }
00205 
00206     void close() { acceptor.close(); }
00207 
00208     io_service& io;
00209     ip::tcp::acceptor acceptor;
00210     ServerInfoPtr serverInfo;
00211     SocketManager::IncomingHandler handler;
00212 
00213 #ifdef HAVE_OPENSSL
00214     shared_ptr<boost::asio::ssl::context> context;
00215 #endif
00216 
00217 };
00218 
00219 int SocketManager::run() {
00220     LOG(SocketManager::className, "Starting");
00221 
00222     for(std::vector<ServerInfoPtr>::iterator i = servers.begin(), iend = servers.end(); i != iend; ++i) {
00223         const ServerInfoPtr& si = *i;
00224 
00225         try {
00226             SocketFactoryPtr factory = make_shared<SocketFactory>(io, incomingHandler, si);
00227             factory->prepareAccept();
00228             factories.push_back(factory);
00229         } catch(const system_error& se) {
00230             LOG(SocketManager::className, "Error while loading server on port " + Util::toString(si->port) +": " + se.what());
00231         }
00232     }
00233 
00234     io.run();
00235 
00236     io.reset();
00237 
00238     return 0;
00239 }
00240 
00241 void SocketManager::closeFactories() {
00242     for(std::vector<SocketFactoryPtr>::iterator i = factories.begin(), iend = factories.end(); i != iend; ++i) {
00243         (*i)->close();
00244     }
00245     factories.clear();
00246 }
00247 
00248 void SocketManager::addJob(const Callback& callback) throw() {
00249     io.post(callback);
00250 }
00251 
00252 void SocketManager::addJob(const long msec, const Callback& callback) {
00253     addJob(boost::posix_time::milliseconds(msec), callback);
00254 }
00255 
00256 void SocketManager::addJob(const std::string& time, const Callback& callback) {
00257     addJob(boost::posix_time::duration_from_string(time), callback);
00258 }
00259 
00260 SocketManager::Callback SocketManager::addTimedJob(const long msec, const Callback& callback) {
00261     return addTimedJob(boost::posix_time::milliseconds(msec), callback);
00262 }
00263 
00264 SocketManager::Callback SocketManager::addTimedJob(const std::string& time, const Callback& callback) {
00265     return addTimedJob(boost::posix_time::duration_from_string(time), callback);
00266 }
00267 
00268 void SocketManager::addJob(const deadline_timer::duration_type& duration, const Callback& callback) {
00269     setTimer(make_shared<timer_ptr::element_type>(io, duration), deadline_timer::duration_type(), new Callback(callback));
00270 }
00271 
00272 SocketManager::Callback SocketManager::addTimedJob(const deadline_timer::duration_type& duration, const Callback& callback) {
00273     timer_ptr timer = make_shared<timer_ptr::element_type>(io, duration);
00274     Callback* pCallback = new Callback(callback); // create a separate callback on the heap to avoid shutdown crashes
00275     setTimer(timer, duration, pCallback);
00276     return std::bind(&SocketManager::cancelTimer, this, timer, pCallback);
00277 }
00278 
00279 void SocketManager::setTimer(timer_ptr timer, const deadline_timer::duration_type& duration, Callback* callback) {
00280     timer->async_wait(std::bind(&SocketManager::handleWait, this, timer, duration, std::placeholders::_1, callback));
00281 }
00282 
00283 void SocketManager::handleWait(timer_ptr timer, const deadline_timer::duration_type& duration, const error_code& error, Callback* callback) {
00284     bool run_on = duration.ticks();
00285 
00286     if(!error) {
00287         if(run_on) {
00288             // re-schedule the timer
00289             timer->expires_at(timer->expires_at() + duration);
00290             setTimer(timer, duration, callback);
00291         }
00292 
00293         addJob(*callback);
00294     }
00295 
00296     if(!run_on) {
00297         // this timer was only running once, so it has no cancel function
00298         delete callback;
00299     }
00300 }
00301 
00302 void SocketManager::cancelTimer(timer_ptr timer, Callback* callback) {
00303     if(timer.get()) {
00304         error_code ec;
00305         timer->cancel(ec);
00306     }
00307 
00308     delete callback;
00309 }
00310 
00311 void SocketManager::startup() throw(ThreadException) {
00312     work.reset(new io_service::work(io));
00313     start();
00314 }
00315 
00316 void SocketManager::shutdown() {
00317     work.reset();
00318     addJob(std::bind(&SocketManager::closeFactories, this));
00319     io.stop();
00320     join();
00321 }
00322 
00323 void SocketManager::onLoad(const SimpleXML& xml) throw() {
00324     servers.clear();
00325 }
00326 
00327 }
Generated on Sat Nov 27 23:37:53 2010 for adchpp by  doxygen 1.6.3