00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 #ifndef __SESSION__
00020 #define __SESSION__
00021
00022 #include <boost/function.hpp>
00023 #include <boost/bind.hpp>
00024 #include <boost/enable_shared_from_this.hpp>
00025 #include <boost/noncopyable.hpp>
00026 #include <deque>
00027 #include <sync/xp/lock.h>
00028 #include <sync/xp/Synchronizer.h>
00029
00030
00031 #define MAX_PACKET_DATA_SIZE 64*1024*1024
00032
00033 class TCPAccountHandler;
00034
00035 class Session : public Synchronizer, public boost::noncopyable, public boost::enable_shared_from_this<Session>
00036 {
00037 public:
00038 Session(asio::io_service& io_service, boost::function<void (boost::shared_ptr<Session>)> ef)
00039 : Synchronizer(boost::bind(&Session::_signal, this)),
00040 socket(io_service),
00041 queue_protector(),
00042 m_ef(ef)
00043 {
00044 }
00045
00046 void connect(asio::ip::tcp::resolver::iterator& iterator)
00047 {
00048 socket.connect(*iterator);
00049 }
00050
00051
00052 asio::ip::tcp::socket& getSocket()
00053 {
00054 return socket;
00055 }
00056
00057 std::string getRemoteAddress()
00058 {
00059 return socket.remote_endpoint().address().to_string();
00060 }
00061
00062 unsigned short getRemotePort()
00063 {
00064 return socket.remote_endpoint().port();
00065 }
00066
00067 void push(int size, char* data)
00068 {
00069 {
00070 abicollab::scoped_lock lock(queue_protector);
00071 incoming.push_back( std::pair<int, char*>(size, data) );
00072 }
00073 Synchronizer::signal();
00074 }
00075
00076
00077
00078
00079 bool pop(int& size, char** data)
00080 {
00081 if (incoming.size() == 0)
00082 return false;
00083 {
00084 abicollab::scoped_lock lock(queue_protector);
00085 std::pair<int, char*> p = incoming.front();
00086 size = p.first;
00087 *data = p.second;
00088 incoming.pop_front();
00089 }
00090 return true;
00091 }
00092
00093 void asyncReadHeader()
00094 {
00095 UT_DEBUGMSG(("Session::asyncReadHeader()\n"));
00096 packet_data = 0;
00097 asio::async_read(socket,
00098 asio::buffer(&packet_size, 4),
00099 boost::bind(&Session::asyncReadHeaderHandler, shared_from_this(), asio::placeholders::error, asio::placeholders::bytes_transferred));
00100 }
00101
00102 void asyncWrite(int size, const char* data)
00103 {
00104
00105 bool writeInProgress = outgoing.size() > 0;
00106
00107
00108 char* store_data = reinterpret_cast<char*>(g_malloc(size));
00109 memcpy(store_data, data, size);
00110 outgoing.push_back(std::pair<int, char*>(size, store_data));
00111
00112 if (!writeInProgress)
00113 {
00114 packet_size_write = size;
00115 packet_data_write = store_data;
00116
00117 UT_DEBUGMSG(("sending datablock of length: %d\n", packet_size_write));
00118 asio::async_write(socket,
00119 asio::buffer(&packet_size_write, 4),
00120 boost::bind(&Session::asyncWriteHeaderHandler, shared_from_this(), asio::placeholders::error));
00121 }
00122 }
00123
00124
00125
00126
00127 bool isConnected()
00128 {
00129 return socket.is_open();
00130 }
00131
00132 void disconnect()
00133 {
00134 UT_DEBUGMSG(("Session::disconnect()\n"));
00135 if (socket.is_open())
00136 {
00137 asio::error_code ecs;
00138 socket.shutdown(asio::ip::tcp::socket::shutdown_both, ecs);
00139 if (ecs) {
00140 UT_DEBUGMSG(("Error shutting down socket: %s\n", ecs.message().c_str()));
00141 }
00142 asio::error_code ecc;
00143 socket.close(ecc);
00144 if (ecc) {
00145 UT_DEBUGMSG(("Error closing socket: %s\n", ecc.message().c_str()));
00146 }
00147 }
00148 UT_DEBUGMSG(("Socket closed, signalling mainloop\n"));
00149 signal();
00150 }
00151
00152 private:
00153 void _signal()
00154 {
00155 UT_DEBUGMSG(("Session::_signal()\n"));
00156 m_ef(shared_from_this());
00157 }
00158
00159 void asyncReadHeaderHandler(const asio::error_code& error,
00160 std::size_t bytes_transferred)
00161 {
00162 if (error)
00163 {
00164 UT_DEBUGMSG(("asyncReadHeaderHandler error: %s\n", error.message().c_str()));
00165 disconnect();
00166 return;
00167 }
00168
00169 if (bytes_transferred != 4)
00170 {
00171 UT_ASSERT_HARMLESS(UT_SHOULD_NOT_HAPPEN);
00172 disconnect();
00173 return;
00174 }
00175
00176 if (packet_size < 0 || packet_size > MAX_PACKET_DATA_SIZE)
00177 {
00178 UT_DEBUGMSG(("Packet size (%d bytes) error - min size: 0, max size %d\n", packet_size, MAX_PACKET_DATA_SIZE));
00179 disconnect();
00180 return;
00181 }
00182
00183 UT_DEBUGMSG(("going to read datablock of length: %d\n", packet_size));
00184
00185 packet_data = reinterpret_cast<char*>(g_malloc(packet_size));
00186 asio::async_read(socket,
00187 asio::buffer(packet_data, packet_size),
00188 boost::bind(&Session::asyncReadHandler, shared_from_this(), asio::placeholders::error, asio::placeholders::bytes_transferred));
00189 }
00190
00191 void asyncReadHandler(const asio::error_code& error,
00192 std::size_t bytes_transferred)
00193 {
00194 if (error)
00195 {
00196 UT_DEBUGMSG(("asyncReadHandler generic error\n"));
00197 disconnect();
00198 return;
00199 }
00200
00201 if (bytes_transferred != std::size_t(packet_size))
00202 {
00203 UT_ASSERT_HARMLESS(UT_SHOULD_NOT_HAPPEN);
00204 disconnect();
00205 return;
00206 }
00207
00208 push(packet_size, packet_data);
00209
00210 asyncReadHeader();
00211 }
00212
00213 void asyncWriteHeaderHandler(const asio::error_code& ec)
00214 {
00215 UT_DEBUGMSG(("Session::asyncWriteHeaderHandler()\n"));
00216 if (ec)
00217 {
00218 UT_DEBUGMSG(("asyncWriteHeaderHandler generic error\n"));
00219 disconnect();
00220 return;
00221 }
00222
00223
00224 asio::async_write(socket,
00225 asio::buffer(packet_data_write, packet_size_write),
00226 boost::bind(&Session::asyncWriteHandler, shared_from_this(), asio::placeholders::error));
00227 }
00228
00229 void asyncWriteHandler(const asio::error_code& ec)
00230 {
00231 UT_DEBUGMSG(("Session::asyncWriteHandler()\n"));
00232 FREEP(packet_data_write);
00233 if (ec)
00234 {
00235 UT_DEBUGMSG(("asyncWriteHandler generic error\n"));
00236 disconnect();
00237 return;
00238 }
00239
00240
00241 outgoing.pop_front();
00242 if (outgoing.size() > 0)
00243 {
00244 std::pair<int, char*> p = outgoing.front();
00245 packet_size_write = p.first;
00246 packet_data_write = p.second;
00247
00248 UT_DEBUGMSG(("sending datablock of length: %d\n", packet_size_write));
00249
00250 asio::async_write(socket,
00251 asio::buffer(&packet_size_write, 4),
00252 boost::bind(&Session::asyncWriteHeaderHandler, shared_from_this(), asio::placeholders::error));
00253 }
00254 }
00255
00256 asio::ip::tcp::socket socket;
00257 abicollab::mutex queue_protector;
00258 std::deque< std::pair<int, char*> > incoming;
00259 std::deque< std::pair<int, char*> > outgoing;
00260
00261 int packet_size;
00262 char* packet_data;
00263
00264 int packet_size_write;
00265 char* packet_data_write;
00266
00267 boost::function<void (boost::shared_ptr<Session>)> m_ef;
00268 };
00269
00270 #endif