mail[Wesnoth-commits] r44098 - in /trunk/src: network_ana.cpp network_manager_ana.cpp network_manager_ana.hpp


Others Months | Index by Date | Thread Index
>>   [Date Prev] [Date Next] [Thread Prev] [Thread Next]

Header


Content

Posted by billybiset on July 11, 2010 - 02:24:
Author: billynux
Date: Sun Jul 11 02:24:29 2010
New Revision: 44098

URL: http://svn.gna.org/viewcvs/wesnoth?rev=44098&view=rev
Log:
More work on ana's server, implemented send_raw_data.

Modified:
    trunk/src/network_ana.cpp
    trunk/src/network_manager_ana.cpp
    trunk/src/network_manager_ana.hpp

Modified: trunk/src/network_ana.cpp
URL: 
http://svn.gna.org/viewcvs/wesnoth/trunk/src/network_ana.cpp?rev=44098&r1=44097&r2=44098&view=diff
==============================================================================
--- trunk/src/network_ana.cpp (original)
+++ trunk/src/network_ana.cpp Sun Jul 11 02:24:29 2010
@@ -100,7 +100,7 @@
 
 static void check_error()
 {
-    std::cout << "DEBUG: check_error\n";
+//     std::cout << "DEBUG: check_error\n";
 }
 
 namespace {
@@ -275,33 +275,23 @@
 //         std::cout << "DEBUG: Trying to read from connection in " << 
timeout << " milliseconds.\n";
         network::connection read_id = ana_manager.read_from( connection_num, 
cfg, timeout );
 
-        if ( read_id == 0 ) // TODO: check timeout and return 0, or throw if 
error occured
-            return 0;
-        else
-        {
-            std::cout << cfg;
-
-            if (cfg.empty())
-                std::cout << "Empty buffer.\n";
-            else
-                std::cout << "Buffer has something.\n";
-
-            return read_id;
-        }
+        // TODO: check timeout and return 0, or throw if error occured
+
+        return read_id;
     }
 
     connection receive_data(config&           cfg,
                             connection        connection_num,
                             bool*             /*gzipped*/,
-                            bandwidth_in_ptr* /*bandwidth_in*/)
+                            bandwidth_in_ptr* /*bandwidth_in*/) // TODO: use 
this pointer
     {
         return receive_data(cfg,connection_num, size_t(0), NULL); // <- just 
call the previous version without timeouts
     }
 
     connection receive_data(std::vector<char>& buf, bandwidth_in_ptr* 
/*bandwidth_in*/)
     {
-        std::cout << "DEBUG: Trying to read to a buffer of size " << 
buf.size() <<".\n";
-        throw std::runtime_error("TODO:Not implemented receive_data2");
+//         std::cout << "DEBUG: Trying to read to a vector<char>.\n";
+        return ana_manager.read_from_all( buf );
     }
 
     struct bandwidth_stats {
@@ -458,12 +448,13 @@
             return ana_manager.send( connection_num, cfg, gzipped );
     }
 
-    void send_raw_data(const char*        /*buf*/,
-                       int                /*len*/,
-                       connection         /*connection_num*/,
+    void send_raw_data(const char*        buf,
+                       int                len,
+                       connection         connection_num,
                        const std::string& /*packet_type*/)
     {
-        throw std::runtime_error("TODO:Not implemented send_raw_data");
+        ana_manager.send_raw_data( buf, size_t( len ), connection_num );
+//         throw std::runtime_error("TODO:Not implemented send_raw_data");
     }
 
     void process_send_queue(connection, size_t)

Modified: trunk/src/network_manager_ana.cpp
URL: 
http://svn.gna.org/viewcvs/wesnoth/trunk/src/network_manager_ana.cpp?rev=44098&r1=44097&r2=44098&view=diff
==============================================================================
--- trunk/src/network_manager_ana.cpp (original)
+++ trunk/src/network_manager_ana.cpp Sun Jul 11 02:24:29 2010
@@ -368,7 +368,8 @@
     receive_stats_(),
     mutex_(),
     condition_(),
-    buffers_()
+    buffers_(),
+    sender_ids_()
 {
 }
 
@@ -381,7 +382,8 @@
     receive_stats_(),
     mutex_(),
     condition_(),
-    buffers_()
+    buffers_(),
+    sender_ids_()
 {
 }
 
@@ -465,11 +467,14 @@
         return client()->get_stats();
 }
 
-void ana_component::add_buffer(ana::detail::read_buffer buffer)
+void ana_component::add_buffer(ana::detail::read_buffer buffer, ana::net_id 
id)
 {
     {
         boost::lock_guard<boost::mutex> lock(mutex_);
         buffers_.push( buffer );
+
+        if ( is_server_ )
+            sender_ids_.push( id );
     }
     condition_.notify_all();
 }
@@ -487,6 +492,21 @@
 
     return buffer_ret;
 }
+
+network::connection ana_component::oldest_sender_id_still_pending()
+{
+    boost::unique_lock<boost::mutex> lock(mutex_);
+
+    if ( sender_ids_.empty())
+        throw std::runtime_error("No pending buffer.");
+
+    const network::connection id = sender_ids_.front();
+
+    sender_ids_.pop();
+
+    return id;
+}
+
 
 void ana_component::update_receive_stats( size_t buffer_size )
 {
@@ -788,11 +808,16 @@
 size_t ana_network_manager::send( network::connection connection_num , const 
config& cfg, bool /*zipped*/ )
 {
     std::cout << "DEBUG: Single send...\n";
-    ana::net_id id( connection_num );
 
     std::ostringstream out;
     compress_config(cfg, out );
 
+    return send_raw_data( out.str().c_str(), out.str().size(), 
connection_num );
+}
+
+size_t ana_network_manager::send_raw_data( const char* base_char, size_t 
size, network::connection connection_num )
+{
+    ana::net_id id( connection_num );
     ana_component_set::iterator it;
 
     it = std::find_if( components_.begin(), components_.end(),
@@ -800,25 +825,42 @@
 
     if ( it != components_.end())
     {
-        ana_send_handler handler( *it, out.str().size() );
-
-        if ( (*it)->is_client() )
-            (*it)->client()->send( ana::buffer( out.str() ), &handler, 
ana::ZERO_COPY);
-        else
-            (*it)->server()->send_all( ana::buffer( out.str() ), &handler, 
ana::ZERO_COPY);
+        if ( (*it)->is_server() )
+            throw std::runtime_error("Can't send to the server itself.");
+
+        ana_send_handler handler( *it, size );
+        (*it)->client()->send( ana::buffer( base_char, size ), &handler, 
ana::ZERO_COPY);
         handler.wait_completion();
 
         if ( handler.error() )
             return 0;
         else
-            return out.str().size();
+            return size;
     }
     else
-        return 0;
-}
-
-void ana_network_manager::send_all_except(const config& cfg, 
network::connection connection_num)
-{
+    {
+        for (it = components_.begin(); it != components_.end(); ++it)
+        {
+            if ((*it)->is_server())
+            {
+                ana_send_handler handler( *it, size );
+                (*it)->server()->send_one( id, ana::buffer( base_char, size 
), &handler, ana::ZERO_COPY);
+                handler.wait_completion();
+                if ( handler.error() )
+                    return 0;
+                else
+                    return size;
+            }
+        }
+    }
+
+    return 0;
+}
+
+void ana_network_manager::send_all_except(const config& /*cfg*/, 
network::connection /*connection_num*/)
+{
+    throw std::runtime_error("send_all_except is not finished.");
+/*
     std::cout << "DEBUG: send_all_except " << connection_num << "\n";
 
     std::stringstream out;
@@ -842,7 +884,9 @@
             handler.wait_completion();
         }
     }
-}
+*/
+}
+
 network::connection ana_network_manager::read_from_ready_buffer( const 
ana_component_set::iterator& it, config& cfg)
 {
     read_config( (*it)->wait_for_element(), cfg);
@@ -938,9 +982,6 @@
     {
         ana::net_id id( connection_num );
 
-        // comment next debug msg: too much output due to being called 
constantly
-//         std::cout << "DEBUG: Trying to read something from (" << 
connection_num << " = " << id << ")\n";
-
         it = std::find_if( components_.begin(), components_.end(),
                         boost::bind(&ana_component::get_id, _1) == id );
 
@@ -949,6 +990,34 @@
         else
             throw std::runtime_error("Trying a network read from an invalid 
component id.");
     }
+}
+
+network::connection ana_network_manager::read_from_all( std::vector<char>& 
vec)
+{
+    ana_component_set::iterator it;
+
+    if ( components_.empty() )
+        return 0;
+
+    for (it = components_.begin(); it != components_.end(); ++it)
+    {
+        if (  (*it)->new_buffer_ready() )
+        {
+            ana::detail::read_buffer buffer = (*it)->wait_for_element();
+
+            char* ch = buffer->base_char();
+            for (size_t i = 0; i < buffer->size(); ++i)  // copy the buffer
+                vec.push_back( *(ch++) );
+
+            if ( (*it)->is_client() )
+                return (*it)->get_wesnoth_id();
+            else
+                return (*it)->oldest_sender_id_still_pending();
+        }
+    }
+
+    // there wasn't any buffer ready
+    return 0;
 }
 
 network::statistics ana_network_manager::get_send_stats(network::connection 
handle)
@@ -1026,7 +1095,7 @@
         if ( it != components_.end() )
         {
             (*it)->update_receive_stats( buffer->size() );
-            (*it)->add_buffer( buffer );
+            (*it)->add_buffer( buffer, client );
         }
         else
             throw std::runtime_error("Received message from a non connected 
component.");

Modified: trunk/src/network_manager_ana.hpp
URL: 
http://svn.gna.org/viewcvs/wesnoth/trunk/src/network_manager_ana.hpp?rev=44098&r1=44097&r2=44098&view=diff
==============================================================================
--- trunk/src/network_manager_ana.hpp (original)
+++ trunk/src/network_manager_ana.hpp Sun Jul 11 02:24:29 2010
@@ -97,7 +97,7 @@
         const ana::stats* get_stats() const;
 
         /** Push a buffer to the queue of incoming messages. */
-        void add_buffer(ana::detail::read_buffer buffer);
+        void add_buffer(ana::detail::read_buffer buffer, ana::net_id id);
 
         /**
          * Blocking operation to wait for a message in a component.
@@ -105,6 +105,9 @@
          * @returns The buffer that was received first from all pending 
buffers.
          */
         ana::detail::read_buffer wait_for_element();
+
+        /** Returns the network id of the oldest sender of a pending buffer. 
*/
+        network::connection oldest_sender_id_still_pending();
 
         bool new_buffer_ready(); // non const due to mutex blockage
 
@@ -129,6 +132,7 @@
         boost::condition_variable      condition_;
 
         std::queue< ana::detail::read_buffer > buffers_;
+        std::queue< network::connection >      sender_ids_;
 };
 
 typedef std::set<ana_component*> ana_component_set;
@@ -415,6 +419,8 @@
         /** Send data to the component with a given ID. */
         size_t send( network::connection connection_num , const config& cfg, 
bool zipped );
 
+        size_t send_raw_data( const char*, size_t, network::connection);
+
         void send_all_except(const config& cfg, network::connection 
connection_num);
 
         /**
@@ -443,6 +449,8 @@
                                        config&             cfg,
                                        size_t              timeout_ms = 0 );
 
+        network::connection read_from_all( std::vector<char>& );
+
         /**
          * Read a message from a given component or from every one.
          *




Related Messages


Powered by MHonArc, Updated Sun Jul 11 06:40:12 2010