[Pvfs2-cvs] commit by aching in pvfs2-1/src/io/flow/flowproto-bmi-trove: flowproto-multiqueue.c

CVS commit program cvs at parl.clemson.edu
Mon Jul 21 14:20:11 EDT 2008


Update of /projects/cvsroot/pvfs2-1/src/io/flow/flowproto-bmi-trove
In directory parlweb1:/tmp/cvs-serv19729/io/flow/flowproto-bmi-trove

Modified Files:
      Tag: locking-branch
	flowproto-multiqueue.c 
Log Message:

Reverse merged and ported to HEAD.


Index: flowproto-multiqueue.c
===================================================================
RCS file: /projects/cvsroot/pvfs2-1/src/io/flow/flowproto-bmi-trove/flowproto-multiqueue.c,v
diff -p -u -r1.113 -r1.113.10.1
--- flowproto-multiqueue.c	16 Aug 2006 01:23:19 -0000	1.113
+++ flowproto-multiqueue.c	21 Jul 2008 18:20:11 -0000	1.113.10.1
@@ -144,17 +144,17 @@ static inline void bmi_send_callback_wra
 {
     struct fp_private_data *flow_data = 
         PRIVATE_FLOW(((struct fp_queue_item*)user_ptr)->parent);
-    gen_mutex_lock(flow_data->parent->flow_mutex);
+    gen_mutex_lock(&flow_data->parent->flow_mutex);
 
     bmi_send_callback_fn(user_ptr, actual_size, error_code, 0);
     if(flow_data->parent->state == FLOW_COMPLETE)
     {
-        gen_mutex_unlock(flow_data->parent->flow_mutex);
+        gen_mutex_unlock(&flow_data->parent->flow_mutex);
         FLOW_CLEANUP(flow_data);
     }
     else
     {
-        gen_mutex_unlock(flow_data->parent->flow_mutex);
+        gen_mutex_unlock(&flow_data->parent->flow_mutex);
     }
 }
 
@@ -164,16 +164,16 @@ static inline void bmi_recv_callback_wra
 {
     struct fp_private_data *flow_data = 
         PRIVATE_FLOW(((struct fp_queue_item*)user_ptr)->parent);
-    gen_mutex_lock(flow_data->parent->flow_mutex);
+    gen_mutex_lock(&flow_data->parent->flow_mutex);
     bmi_recv_callback_fn(user_ptr, actual_size, error_code);
     if(flow_data->parent->state == FLOW_COMPLETE)
     {
-        gen_mutex_unlock(flow_data->parent->flow_mutex);
+        gen_mutex_unlock(&flow_data->parent->flow_mutex);
         FLOW_CLEANUP(flow_data);
     }
     else
     {
-        gen_mutex_unlock(flow_data->parent->flow_mutex);
+        gen_mutex_unlock(&flow_data->parent->flow_mutex);
     }
 }
 
@@ -183,16 +183,16 @@ static inline void trove_read_callback_w
     struct fp_private_data *flow_data = 
         PRIVATE_FLOW(((struct
         result_chain_entry*)user_ptr)->q_item->parent);
-    gen_mutex_lock(flow_data->parent->flow_mutex);
+    gen_mutex_lock(&flow_data->parent->flow_mutex);
     trove_read_callback_fn(user_ptr, error_code);
     if(flow_data->parent->state == FLOW_COMPLETE)
     {
-        gen_mutex_unlock(flow_data->parent->flow_mutex);
+        gen_mutex_unlock(&flow_data->parent->flow_mutex);
         FLOW_CLEANUP(flow_data);
     }
     else
     {
-        gen_mutex_unlock(flow_data->parent->flow_mutex);
+        gen_mutex_unlock(&flow_data->parent->flow_mutex);
     }
 }
 
@@ -202,16 +202,16 @@ static inline void trove_write_callback_
     struct fp_private_data *flow_data = 
         PRIVATE_FLOW(((struct
                        result_chain_entry*)user_ptr)->q_item->parent);
-    gen_mutex_lock(flow_data->parent->flow_mutex);
+    gen_mutex_lock(&flow_data->parent->flow_mutex);
     trove_write_callback_fn(user_ptr, error_code);
     if(flow_data->parent->state == FLOW_COMPLETE)
     {
-        gen_mutex_unlock(flow_data->parent->flow_mutex);
+        gen_mutex_unlock(&flow_data->parent->flow_mutex);
         FLOW_CLEANUP(flow_data);
     }
     else
     {
-        gen_mutex_unlock(flow_data->parent->flow_mutex);
+        gen_mutex_unlock(&flow_data->parent->flow_mutex);
     }
 }
 
@@ -233,16 +233,16 @@ static void mem_to_bmi_callback_wrapper(
 {
     struct fp_private_data *flow_data = 
         PRIVATE_FLOW(((struct fp_queue_item*)user_ptr)->parent);
-    gen_mutex_lock(flow_data->parent->flow_mutex);
+    gen_mutex_lock(&flow_data->parent->flow_mutex);
     mem_to_bmi_callback_fn(user_ptr, actual_size, error_code);
     if(flow_data->parent->state == FLOW_COMPLETE)
     {
-        gen_mutex_unlock(flow_data->parent->flow_mutex);
+        gen_mutex_unlock(&flow_data->parent->flow_mutex);
         FLOW_CLEANUP(flow_data);
     }
     else
     {
-        gen_mutex_unlock(flow_data->parent->flow_mutex);
+        gen_mutex_unlock(&flow_data->parent->flow_mutex);
     }
 }
 
@@ -255,18 +255,17 @@ static void bmi_to_mem_callback_wrapper(
 
     assert(flow_data);
     assert(flow_data->parent);
-    assert(flow_data->parent->flow_mutex);
 
-    gen_mutex_lock(flow_data->parent->flow_mutex);
+    gen_mutex_lock(&flow_data->parent->flow_mutex);
     bmi_to_mem_callback_fn(user_ptr, actual_size, error_code);
     if(flow_data->parent->state == FLOW_COMPLETE)
     {
-        gen_mutex_unlock(flow_data->parent->flow_mutex);
+        gen_mutex_unlock(&flow_data->parent->flow_mutex);
         FLOW_CLEANUP(flow_data);
     }
     else
     {
-        gen_mutex_unlock(flow_data->parent->flow_mutex);
+        gen_mutex_unlock(&flow_data->parent->flow_mutex);
     }
 }
 
@@ -459,8 +458,8 @@ int fp_multiqueue_cancel(flow_descriptor
 {
     struct fp_private_data *flow_data = PRIVATE_FLOW(flow_d);
 
-    gossip_err("Flow proto cancel called on %p\n", flow_d);
-    gen_mutex_lock(flow_data->parent->flow_mutex);
+    gossip_err("%s: flow proto cancel called on %p\n", __func__, flow_d);
+    gen_mutex_lock(&flow_data->parent->flow_mutex);
     /*
       if the flow is already marked as complete, then there is nothing
       to do
@@ -468,14 +467,16 @@ int fp_multiqueue_cancel(flow_descriptor
     if(flow_d->state != FLOW_COMPLETE)
     {
         gossip_debug(GOSSIP_CANCEL_DEBUG,
-            "PINT_flow_cancel() called on active flow, %lld "
-                     "bytes transferred.\n",
-                     lld(flow_d->total_transferred));
+                     "%s: called on active flow, %lld bytes transferred.\n",
+                     __func__, lld(flow_d->total_transferred));
         assert(flow_d->state == FLOW_TRANSMITTING);
-        handle_io_error(-PVFS_ECANCEL, NULL, flow_data);
+        /* NOTE: set flow error class bit so that system interface understands
+         * that this may be a retry-able error
+         */
+        handle_io_error(-(PVFS_ECANCEL|PVFS_ERROR_FLOW), NULL, flow_data);
         if(flow_data->parent->state == FLOW_COMPLETE)
         {
-            gen_mutex_unlock(flow_data->parent->flow_mutex);
+            gen_mutex_unlock(&flow_data->parent->flow_mutex);
             FLOW_CLEANUP(flow_data);
             return(0);
         }
@@ -483,10 +484,10 @@ int fp_multiqueue_cancel(flow_descriptor
     else
     {
         gossip_debug(GOSSIP_CANCEL_DEBUG,
-                     "PINT_flow_cancel() called on already completed "
-                     "flow; doing nothing.\n");
+                     "%s: called on already completed flow; doing nothing.\n",
+                     __func__);
     }
-    gen_mutex_unlock(flow_data->parent->flow_mutex);
+    gen_mutex_unlock(&flow_data->parent->flow_mutex);
 
     return(0);
 }
@@ -583,16 +584,16 @@ int fp_multiqueue_post(flow_descriptor  
             qlist_add_tail(&flow_data->prealloc_array[i].list_link,
                 &flow_data->empty_list);
         }
-        gen_mutex_lock(flow_data->parent->flow_mutex);
+        gen_mutex_lock(&flow_data->parent->flow_mutex);
         bmi_to_mem_callback_fn(&(flow_data->prealloc_array[0]), 0, 0);
         if(flow_data->parent->state == FLOW_COMPLETE)
         {
-            gen_mutex_unlock(flow_data->parent->flow_mutex);
+            gen_mutex_unlock(&flow_data->parent->flow_mutex);
             FLOW_CLEANUP(flow_data);
         }
         else
         {
-            gen_mutex_unlock(flow_data->parent->flow_mutex);
+            gen_mutex_unlock(&flow_data->parent->flow_mutex);
         }
     }
     else if(flow_d->src.endpoint_id == MEM_ENDPOINT &&
@@ -609,16 +610,16 @@ int fp_multiqueue_post(flow_descriptor  
             qlist_add_tail(&flow_data->prealloc_array[i].list_link,
                 &flow_data->empty_list);
         }
-        gen_mutex_lock(flow_data->parent->flow_mutex);
+        gen_mutex_lock(&flow_data->parent->flow_mutex);
         mem_to_bmi_callback_fn(&(flow_data->prealloc_array[0]), 0, 0);
         if(flow_data->parent->state == FLOW_COMPLETE)
         {
-            gen_mutex_unlock(flow_data->parent->flow_mutex);
+            gen_mutex_unlock(&flow_data->parent->flow_mutex);
             FLOW_CLEANUP(flow_data);
         }
         else
         {
-            gen_mutex_unlock(flow_data->parent->flow_mutex);
+            gen_mutex_unlock(&flow_data->parent->flow_mutex);
         }
     }
 #ifdef __PVFS2_TROVE_SUPPORT__
@@ -626,7 +627,7 @@ int fp_multiqueue_post(flow_descriptor  
         flow_d->dest.endpoint_id == BMI_ENDPOINT)
     {
         flow_data->initial_posts = flow_d->buffers_per_flow;
-        gen_mutex_lock(flow_data->parent->flow_mutex);
+        gen_mutex_lock(&flow_data->parent->flow_mutex);
         for(i=0; i<flow_d->buffers_per_flow; i++)
         {
             gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
@@ -635,18 +636,17 @@ int fp_multiqueue_post(flow_descriptor  
             bmi_send_callback_fn(&(flow_data->prealloc_array[i]), 0, 0, 1);
             if(flow_data->dest_last_posted)
             {
-                flow_data->initial_posts = 0;
                 break;
             }
         }
         if(flow_data->parent->state == FLOW_COMPLETE)
         {
-            gen_mutex_unlock(flow_data->parent->flow_mutex);
+            gen_mutex_unlock(&flow_data->parent->flow_mutex);
             FLOW_CLEANUP(flow_data);
         }
         else
         {
-            gen_mutex_unlock(flow_data->parent->flow_mutex);
+            gen_mutex_unlock(&flow_data->parent->flow_mutex);
         }
     }
     else if(flow_d->src.endpoint_id == BMI_ENDPOINT &&
@@ -666,16 +666,16 @@ int fp_multiqueue_post(flow_descriptor  
             &flow_data->prealloc_array[0];
         gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
             "flowproto-multiqueue forcing trove_write_callback_fn.\n");
-        gen_mutex_lock(flow_data->parent->flow_mutex);
+        gen_mutex_lock(&flow_data->parent->flow_mutex);
         trove_write_callback_fn(&(flow_data->prealloc_array[0].result_chain), 0);
         if(flow_data->parent->state == FLOW_COMPLETE)
         {
-            gen_mutex_unlock(flow_data->parent->flow_mutex);
+            gen_mutex_unlock(&flow_data->parent->flow_mutex);
             FLOW_CLEANUP(flow_data);
         }
         else
         {
-            gen_mutex_unlock(flow_data->parent->flow_mutex);
+            gen_mutex_unlock(&flow_data->parent->flow_mutex);
         }
     }
 #endif
@@ -709,6 +709,7 @@ static void bmi_recv_callback_fn(void *u
     PVFS_size bytes_processed = 0;
     void *tmp_buffer;
     void *tmp_user_ptr;
+    int sync_mode = 0;
 
     gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
         "flowproto-multiqueue bmi_recv_callback_fn, error code: %d, flow: %p.\n",
@@ -737,6 +738,15 @@ static void bmi_recv_callback_fn(void *u
         tmp_user_ptr = result_tmp;
         assert(result_tmp->result.bytes);
 
+        if(PINT_REQUEST_DONE(q_item->parent->file_req_state))
+        {
+            /* This is the last write operation for this flow.  Set sync
+             * flag if needed
+             */ 
+            sync_mode = get_data_sync_mode(
+                q_item->parent->dest.u.trove.coll_id);
+        }
+
         ret = trove_bstream_write_list(
             q_item->parent->dest.u.trove.coll_id,
             q_item->parent->dest.u.trove.handle,
@@ -747,7 +757,7 @@ static void bmi_recv_callback_fn(void *u
             result_tmp->result.size_array,
             result_tmp->result.segs,
             &q_item->out_size,
-            get_data_sync_mode(q_item->parent->dest.u.trove.coll_id),
+            sync_mode,
             NULL,
             &result_tmp->trove_callback,
             global_trove_context,
@@ -955,7 +965,10 @@ static void trove_read_callback_fn(void 
                 global_bmi_context);
             flow_data->next_seq_to_send++;
             if(q_item->last)
+            {
+                flow_data->initial_posts = 0;
                 flow_data->dest_last_posted = 1;
+            }
             gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
                 "%s: (post send time) ini posts: %d, pending: %d, last: %d\n",
                 __func__,
@@ -1169,7 +1182,10 @@ static int bmi_send_callback_fn(void *us
          * is no work to do, trigger manually
          */
         if(flow_data->total_bytes_processed == 0)
+        {
+            flow_data->initial_posts = 0;
             flow_data->dest_last_posted = 1;
+        }
     }
 
     if(bytes_processed == 0)
@@ -1213,6 +1229,7 @@ static int bmi_send_callback_fn(void *us
              * to prevent further trying to start other qitems from being
              * posted
              */
+            flow_data->initial_posts = 0;
             flow_data->dest_last_posted = 1;
             return 0;
         }
@@ -1940,6 +1957,7 @@ static void handle_io_error(
     struct fp_private_data *flow_data)
 {
     int ret;
+    char buf[64] = {0};
 
     gossip_debug(GOSSIP_FLOW_PROTO_DEBUG, 
         "flowproto-multiqueue handle_io_error() called for flow %p.\n",
@@ -1949,8 +1967,9 @@ static void handle_io_error(
     if(flow_data->parent->error_code == 0)
     {
         enum flow_endpoint_type src, dest;
-    
-        gossip_err("Flow proto error cleanup started on %p, error_code: %d\n", flow_data->parent, error_code);
+
+        PVFS_strerror_r(error_code, buf, 64);
+        gossip_err("%s: flow proto error cleanup started on %p: %s\n", __func__, flow_data->parent, buf);
 
         flow_data->parent->error_code = error_code;
         if(q_item)
@@ -1967,14 +1986,14 @@ static void handle_io_error(
         {
             ret = cancel_pending_bmi(&flow_data->src_list);
             gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
-                "flowproto-multiqueue canceling %d BMI-mem BMI ops.\n", ret);
+                "flowproto-multiqueue canceled %d bmi-mem BMI ops.\n", ret);
             flow_data->cleanup_pending_count += ret;
         }
         else if (src == MEM_ENDPOINT && dest == BMI_ENDPOINT)
         {
             ret = cancel_pending_bmi(&flow_data->dest_list);
             gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
-                "flowproto-multiqueue canceling %d mem-BMI BMI ops.\n", ret);
+                "flowproto-multiqueue canceled %d mem-bmi BMI ops.\n", ret);
             flow_data->cleanup_pending_count += ret;
         }
         else if (src == TROVE_ENDPOINT && dest == BMI_ENDPOINT)
@@ -1982,21 +2001,21 @@ static void handle_io_error(
             ret = cancel_pending_trove(&flow_data->src_list);
             flow_data->cleanup_pending_count += ret;
             gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
-                "flowproto-multiqueue canceling %d trove-bmi Trove ops.\n", ret);
+                "flowproto-multiqueue canceled %d trove-bmi Trove ops.\n", ret);
             ret = cancel_pending_bmi(&flow_data->dest_list);
             gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
-                "flowproto-multiqueue canceling %d trove-bmi BMI ops.\n", ret);
+                "flowproto-multiqueue canceled %d trove-bmi BMI ops.\n", ret);
             flow_data->cleanup_pending_count += ret;
         }
         else if (src == BMI_ENDPOINT && dest == TROVE_ENDPOINT)
         {
             ret = cancel_pending_bmi(&flow_data->src_list);
             gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
-                "flowproto-multiqueue canceling %d bmi-trove BMI ops.\n", ret);
+                "flowproto-multiqueue canceled %d bmi-trove BMI ops.\n", ret);
             flow_data->cleanup_pending_count += ret;
             ret = cancel_pending_trove(&flow_data->dest_list);
             gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
-                "flowproto-multiqueue canceling %d bmi-trove Trove ops.\n", ret);
+                "flowproto-multiqueue canceled %d bmi-trove Trove ops.\n", ret);
             flow_data->cleanup_pending_count += ret;
         }
         else
@@ -2004,8 +2023,9 @@ static void handle_io_error(
             /* impossible condition */
             assert(0);
         }
-        gossip_err("Flow proto %p canceling a total of %d BMI or Trove operations\n",
-            flow_data->parent, flow_data->cleanup_pending_count);
+        gossip_err("%s: flow proto %p canceled %d operations, will clean up.\n",
+                   __func__, flow_data->parent,
+                   flow_data->cleanup_pending_count);
     }
     else
     {
@@ -2019,8 +2039,9 @@ static void handle_io_error(
 
     if(flow_data->cleanup_pending_count == 0)
     {
-        gossip_err("Flow proto error cleanup finished %p, error_code: %d\n",
-            flow_data->parent, flow_data->parent->error_code);
+        PVFS_strerror_r(flow_data->parent->error_code, buf, 64);
+        gossip_err("%s: flow proto %p error cleanup finished: %s\n",
+            __func__, flow_data->parent, buf);
 
         /* we are finished, make sure error is marked and state is set */
         assert(flow_data->parent->error_code);



More information about the Pvfs2-cvs mailing list