• Main Page
  • Related Pages
  • Namespaces
  • Classes
  • Files
  • File List
  • File Members

Session.h

Go to the documentation of this file.
00001 /* Copyright (C) 2007,2008 by Marc Maurer <uwog@uwog.net>
00002  *
00003  * This program is free software; you can redistribute it and/or
00004  * modify it under the terms of the GNU General Public License
00005  * as published by the Free Software Foundation; either version 2
00006  * of the License, or (at your option) any later version.
00007  *
00008  * This program is distributed in the hope that it will be useful,
00009  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00010  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00011  * GNU General Public License for more details.
00012  *
00013  * You should have received a copy of the GNU General Public License
00014  * along with this program; if not, write to the Free Software
00015  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
00016  * 02111-1307, USA.
00017  */
00018 
00019 #ifndef __SESSION__
00020 #define __SESSION__
00021 
00022 #include <boost/function.hpp>
00023 #include <boost/bind.hpp>
00024 #include <boost/enable_shared_from_this.hpp>
00025 #include <deque>
00026 #include <sync/xp/lock.h>
00027 #include <sync/xp/Synchronizer.h>
00028 
00029 // 64MB seems reasonable enough for now...
00030 #define MAX_PACKET_DATA_SIZE 64*1024*1024
00031 
00032 class TCPAccountHandler;
00033 
00034 class Session : public Synchronizer, public boost::noncopyable, public boost::enable_shared_from_this<Session>
00035 {
00036 public:
00037     Session(asio::io_service& io_service, boost::function<void (boost::shared_ptr<Session>)> ef)
00038         : Synchronizer(boost::bind(&Session::_signal, this)),
00039         socket(io_service),
00040         queue_protector(),
00041         m_ef(ef)
00042     {
00043     }
00044 
00045     void connect(asio::ip::tcp::resolver::iterator& iterator)
00046     {
00047         socket.connect(*iterator);
00048     }
00049 
00050     // TODO: don't expose this
00051     asio::ip::tcp::socket& getSocket()
00052     {
00053         return socket;
00054     }
00055 
00056     std::string getRemoteAddress()
00057     {
00058         return socket.remote_endpoint().address().to_string();
00059     }
00060 
00061     unsigned short getRemotePort()
00062     {
00063         return socket.remote_endpoint().port();
00064     }
00065 
00066     void push(int size, char* data)
00067     {
00068         {
00069             abicollab::scoped_lock lock(queue_protector);
00070             incoming.push_back( std::pair<int, char*>(size, data) );
00071         }
00072         Synchronizer::signal();
00073     }
00074 
00075     /*
00076         Only called fron the abiword main loop
00077     */
00078     bool pop(int& size, char** data)
00079     {
00080         if (incoming.size() == 0)
00081             return false;
00082         {
00083             abicollab::scoped_lock lock(queue_protector);
00084             std::pair<int, char*> p = incoming.front();
00085             size = p.first;
00086             *data = p.second;
00087             incoming.pop_front();
00088         }
00089         return true;
00090     }
00091 
00092     void asyncReadHeader()
00093     {
00094         UT_DEBUGMSG(("Session::asyncReadHeader()\n"));
00095         packet_data = 0; // just to be sure we'll never touch a datablock we might have read before
00096         asio::async_read(socket,
00097             asio::buffer(&packet_size, 4),
00098             boost::bind(&Session::asyncReadHeaderHandler, shared_from_this(), asio::placeholders::error, asio::placeholders::bytes_transferred));
00099     }
00100 
00101     void asyncWrite(int size, const char* data)
00102     {
00103         // TODO: this is a race condition, mutex this
00104         bool writeInProgress = outgoing.size() > 0;
00105 
00106         // FIXME: inefficient memory copy
00107         char* store_data = reinterpret_cast<char*>(g_malloc(size));
00108         memcpy(store_data, data, size);
00109         outgoing.push_back(std::pair<int, char*>(size, store_data));
00110 
00111         if (!writeInProgress)
00112         {
00113             packet_size_write = size;
00114             packet_data_write = store_data;
00115 
00116             UT_DEBUGMSG(("sending datablock of length: %d\n", packet_size_write));
00117             asio::async_write(socket,
00118                 asio::buffer(&packet_size_write, 4),
00119                 boost::bind(&Session::asyncWriteHeaderHandler, shared_from_this(), asio::placeholders::error));
00120         }
00121     }
00122 
00123     /*
00124         Only called fron the abiword main loop
00125     */
00126     bool isConnected()
00127     {
00128         return socket.is_open();
00129     }
00130 
00131     void disconnect()
00132     {
00133         UT_DEBUGMSG(("Session::disconnect()\n"));
00134         if (socket.is_open())
00135         {
00136             asio::error_code ecs;
00137             socket.shutdown(asio::ip::tcp::socket::shutdown_both, ecs);
00138             if (ecs) {
00139                 UT_DEBUGMSG(("Error shutting down socket: %s\n", ecs.message().c_str()));
00140             }
00141             asio::error_code ecc;
00142             socket.close(ecc);
00143             if (ecc) {
00144                 UT_DEBUGMSG(("Error closing socket: %s\n", ecc.message().c_str()));
00145             }
00146         }
00147         UT_DEBUGMSG(("Socket closed, signalling mainloop\n"));
00148         signal();
00149     }
00150 
00151 private:
00152     void _signal()
00153     {
00154         UT_DEBUGMSG(("Session::_signal()\n"));
00155         m_ef(shared_from_this());
00156     }
00157 
00158     void asyncReadHeaderHandler(const asio::error_code& error,
00159         std::size_t bytes_transferred)
00160     {
00161         if (error)
00162         {
00163             UT_DEBUGMSG(("asyncReadHeaderHandler error: %s\n", error.message().c_str()));
00164             disconnect();
00165             return;
00166         }
00167 
00168         if (bytes_transferred != 4)
00169         {
00170             UT_ASSERT_HARMLESS(UT_SHOULD_NOT_HAPPEN);
00171             disconnect(); // TODO: should not happen, handle this
00172             return;
00173         }
00174 
00175         if (packet_size < 0 || packet_size > MAX_PACKET_DATA_SIZE)
00176         {
00177             UT_DEBUGMSG(("Packet size (%d bytes) error - min size: 0, max size %d\n", packet_size, MAX_PACKET_DATA_SIZE));
00178             disconnect();
00179             return;
00180         }
00181 
00182         UT_DEBUGMSG(("going to read datablock of length: %d\n", packet_size));
00183         // now continue reading the packet data
00184         packet_data = reinterpret_cast<char*>(g_malloc(packet_size));
00185         asio::async_read(socket,
00186             asio::buffer(packet_data, packet_size),
00187             boost::bind(&Session::asyncReadHandler, shared_from_this(), asio::placeholders::error, asio::placeholders::bytes_transferred));
00188     }
00189 
00190     void asyncReadHandler(const asio::error_code& error,
00191         std::size_t bytes_transferred)
00192     {
00193         if (error)
00194         {
00195             UT_DEBUGMSG(("asyncReadHandler generic error\n"));
00196             disconnect();
00197             return;
00198         }
00199 
00200         if (bytes_transferred != std::size_t(packet_size))
00201         {
00202             UT_ASSERT_HARMLESS(UT_SHOULD_NOT_HAPPEN);
00203             disconnect(); // TODO: should not happen, handle this
00204             return;
00205         }
00206 
00207         push(packet_size, packet_data);
00208         // start over for a new packet
00209         asyncReadHeader();
00210     }
00211 
00212     void asyncWriteHeaderHandler(const asio::error_code& ec)
00213     {
00214         UT_DEBUGMSG(("Session::asyncWriteHeaderHandler()\n"));
00215         if (ec)
00216         {
00217             UT_DEBUGMSG(("asyncWriteHeaderHandler generic error\n"));
00218             disconnect();
00219             return;
00220         }
00221 
00222         // write the packet body
00223         asio::async_write(socket,
00224             asio::buffer(packet_data_write, packet_size_write),
00225             boost::bind(&Session::asyncWriteHandler, shared_from_this(), asio::placeholders::error));
00226     }
00227 
00228     void asyncWriteHandler(const asio::error_code& ec)
00229     {
00230         UT_DEBUGMSG(("Session::asyncWriteHandler()\n"));
00231         FREEP(packet_data_write);
00232         if (ec)
00233         {
00234             UT_DEBUGMSG(("asyncWriteHandler generic error\n"));
00235             disconnect();
00236             return;
00237         }
00238 
00239         // TODO: this is a race condition, mutex this
00240         outgoing.pop_front();
00241         if (outgoing.size() > 0)
00242         {
00243             std::pair<int, char*> p = outgoing.front();
00244             packet_size_write = p.first;
00245             packet_data_write = p.second;
00246 
00247             UT_DEBUGMSG(("sending datablock of length: %d\n", packet_size_write));
00248 
00249             asio::async_write(socket,
00250                 asio::buffer(&packet_size_write, 4),
00251                 boost::bind(&Session::asyncWriteHeaderHandler, shared_from_this(), asio::placeholders::error));
00252         }
00253     }
00254 
00255     asio::ip::tcp::socket                   socket;
00256     abicollab::mutex                        queue_protector;
00257     std::deque< std::pair<int, char*> >     incoming;
00258     std::deque< std::pair<int, char*> >     outgoing;
00259 
00260     int                                     packet_size; // state needed for async reads
00261     char*                                   packet_data; // state needed for async reads
00262 
00263     int                                     packet_size_write; // state needed for async writes
00264     char*                                   packet_data_write; // state needed for async writes
00265 
00266     boost::function<void (boost::shared_ptr<Session>)>      m_ef;
00267 };
00268 
00269 #endif /* __SESSION__ */

Generated on Sun May 27 2012 for AbiWord by  doxygen 1.7.1