[Pvfs2-cvs] commit by nlmills in pvfs2/src/io/flow/flowproto-bmi-trove: flowproto-multiqueue.c

CVS commit program cvs at parl.clemson.edu
Tue Aug 25 13:56:16 EDT 2009


Update of /anoncvs/pvfs2/src/io/flow/flowproto-bmi-trove
In directory parlweb1:/tmp/cvs-serv5511/src/io/flow/flowproto-bmi-trove

Modified Files:
      Tag: cu-security-branch
	flowproto-multiqueue.c 
Log Message:
merged in changes from summer at LANL


Index: flowproto-multiqueue.c
===================================================================
RCS file: /anoncvs/pvfs2/src/io/flow/flowproto-bmi-trove/flowproto-multiqueue.c,v
diff -p -u -r1.117.10.1 -r1.117.10.2
--- flowproto-multiqueue.c	16 May 2008 15:15:41 -0000	1.117.10.1
+++ flowproto-multiqueue.c	25 Aug 2009 17:56:16 -0000	1.117.10.2
@@ -106,7 +106,8 @@ static void handle_io_error(
 static int cancel_pending_bmi(
     struct qlist_head *list);
 static int cancel_pending_trove(
-    struct qlist_head *list);
+    struct qlist_head *list, 
+    TROVE_coll_id coll_id);
 
 #ifdef __PVFS2_TROVE_SUPPORT__
 typedef struct
@@ -473,6 +474,7 @@ int fp_multiqueue_cancel(flow_descriptor
         /* NOTE: set flow error class bit so that system interface understands
          * that this may be a retry-able error
          */
+        gossip_err("%s: I/O error occurred\n", __func__);
         handle_io_error(-(PVFS_ECANCEL|PVFS_ERROR_FLOW), NULL, flow_data);
         if(flow_data->parent->state == FLOW_COMPLETE)
         {
@@ -709,6 +711,7 @@ static void bmi_recv_callback_fn(void *u
     PVFS_size bytes_processed = 0;
     void *tmp_buffer;
     void *tmp_user_ptr;
+    int sync_mode = 0;
 
     gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
         "flowproto-multiqueue bmi_recv_callback_fn, error code: %d, flow: %p.\n",
@@ -718,6 +721,7 @@ static void bmi_recv_callback_fn(void *u
 
     if(error_code != 0 || flow_data->parent->error_code != 0)
     {
+        gossip_err("%s: I/O error occurred\n", __func__);
         handle_io_error(error_code, q_item, flow_data);
         return;
     }
@@ -737,6 +741,15 @@ static void bmi_recv_callback_fn(void *u
         tmp_user_ptr = result_tmp;
         assert(result_tmp->result.bytes);
 
+        if(PINT_REQUEST_DONE(q_item->parent->file_req_state))
+        {
+            /* This is the last write operation for this flow.  Set sync
+             * flag if needed
+             */ 
+            sync_mode = get_data_sync_mode(
+                q_item->parent->dest.u.trove.coll_id);
+        }
+
         ret = trove_bstream_write_list(
             q_item->parent->dest.u.trove.coll_id,
             q_item->parent->dest.u.trove.handle,
@@ -747,16 +760,18 @@ static void bmi_recv_callback_fn(void *u
             result_tmp->result.size_array,
             result_tmp->result.segs,
             &q_item->out_size,
-            get_data_sync_mode(q_item->parent->dest.u.trove.coll_id),
+            sync_mode,
             NULL,
             &result_tmp->trove_callback,
             global_trove_context,
-            &result_tmp->posted_id);
+            &result_tmp->posted_id,
+            q_item->parent->hints);
 
         result_tmp = result_tmp->next;
 
         if(ret < 0)
         {
+            gossip_err("%s: I/O error occurred\n", __func__);
             handle_io_error(ret, q_item, flow_data);
             return;
         }
@@ -782,7 +797,8 @@ static void bmi_recv_callback_fn(void *u
         if(!q_item->buffer)
         {
             /* if the q_item has not been used, allocate a buffer */
-            q_item->buffer = BMI_memalloc(q_item->parent->src.u.bmi.address,
+            q_item->buffer = BMI_memalloc(
+                q_item->parent->src.u.bmi.address,
                 q_item->parent->buffer_size, BMI_RECV);
             /* TODO: error handling */
             assert(q_item->buffer);
@@ -850,19 +866,26 @@ static void bmi_recv_callback_fn(void *u
 
         flow_data->total_bytes_processed += bytes_processed;
 
+        gossip_debug(GOSSIP_DIRECTIO_DEBUG,
+                     "offset %llu, buffer ptr: %p\n",
+                     llu(q_item->result_chain.result.offset_array[0]),
+                     q_item->buffer);
+
         /* TODO: what if we recv less than expected? */
         ret = BMI_post_recv(&q_item->posted_id,
-            q_item->parent->src.u.bmi.address,
-            q_item->buffer,
-            flow_data->parent->buffer_size,
-            &tmp_actual_size,
+                            q_item->parent->src.u.bmi.address,
+                            ((char *)q_item->buffer),
+                            flow_data->parent->buffer_size,
+                            &tmp_actual_size,
             BMI_PRE_ALLOC,
             q_item->parent->tag,
             &q_item->bmi_callback,
-            global_bmi_context);
-        
+            global_bmi_context,
+            (bmi_hint)q_item->parent->hints);
+
         if(ret < 0)
         {
+            gossip_err("%s: I/O error occurred\n", __func__);
             handle_io_error(ret, q_item, flow_data);
             return;
         }
@@ -873,7 +896,7 @@ static void bmi_recv_callback_fn(void *u
             bmi_recv_callback_fn(q_item, tmp_actual_size, 0);
         }
     }
-        
+
     return;
 }
 
@@ -905,6 +928,7 @@ static void trove_read_callback_fn(void 
 
     if(error_code != 0 || flow_data->parent->error_code != 0)
     {
+        gossip_err("%s: I/O error occurred\n", __func__);
         handle_io_error(error_code, q_item, flow_data);
         return;
     }
@@ -952,7 +976,8 @@ static void trove_read_callback_fn(void 
                 BMI_PRE_ALLOC,
                 q_item->parent->tag,
                 &q_item->bmi_callback,
-                global_bmi_context);
+                global_bmi_context,
+                (bmi_hint)q_item->parent->hints);
             flow_data->next_seq_to_send++;
             if(q_item->last)
             {
@@ -973,6 +998,7 @@ static void trove_read_callback_fn(void 
 
         if(ret < 0)
         {
+            gossip_err("%s: I/O error occurred\n", __func__);
             handle_io_error(ret, q_item, flow_data);
             return;
         }
@@ -1026,6 +1052,7 @@ static int bmi_send_callback_fn(void *us
 
     if(error_code != 0 || flow_data->parent->error_code != 0)
     {
+        gossip_err("%s: I/O error occurred\n", __func__);
         handle_io_error(error_code, q_item, flow_data);
         if(flow_data->parent->state == FLOW_COMPLETE)
             return(1);
@@ -1091,7 +1118,8 @@ static int bmi_send_callback_fn(void *us
     else
     {
         /* if the q_item has not been used, allocate a buffer */
-        q_item->buffer = BMI_memalloc(q_item->parent->dest.u.bmi.address,
+        q_item->buffer = BMI_memalloc(
+            q_item->parent->dest.u.bmi.address,
             q_item->parent->buffer_size, BMI_SEND);
         /* TODO: error handling */
         assert(q_item->buffer);
@@ -1254,12 +1282,14 @@ static int bmi_send_callback_fn(void *us
             NULL,
             &result_tmp->trove_callback,
             global_trove_context,
-            &result_tmp->posted_id);
+            &result_tmp->posted_id,
+            flow_data->parent->hints);
 
         result_tmp = result_tmp->next;
 
         if(ret < 0)
         {
+            gossip_err("%s: I/O error occurred\n", __func__);
             handle_io_error(ret, q_item, flow_data);
             if(flow_data->parent->state == FLOW_COMPLETE)
                 return(1);
@@ -1304,6 +1334,7 @@ static void trove_write_callback_fn(void
 
     if(error_code != 0 || flow_data->parent->error_code != 0)
     {
+        gossip_err("%s: I/O error occurred\n", __func__);
         handle_io_error(error_code, q_item, flow_data);
         return;
     }
@@ -1359,7 +1390,8 @@ static void trove_write_callback_fn(void
     else
     {
         /* if the q_item has not been used, allocate a buffer */
-        q_item->buffer = BMI_memalloc(q_item->parent->src.u.bmi.address,
+        q_item->buffer = BMI_memalloc(
+            q_item->parent->src.u.bmi.address,
             q_item->parent->buffer_size, BMI_RECV);
         /* TODO: error handling */
         assert(q_item->buffer);
@@ -1443,19 +1475,25 @@ static void trove_write_callback_fn(void
             return;
         }
 
+        gossip_debug(GOSSIP_DIRECTIO_DEBUG,
+                     "offset %llu, buffer ptr: %p\n",
+                     llu(q_item->result_chain.result.offset_array[0]),
+                     q_item->buffer);
         /* TODO: what if we recv less than expected? */
         ret = BMI_post_recv(&q_item->posted_id,
             q_item->parent->src.u.bmi.address,
-            q_item->buffer,
+	    ((char *)q_item->buffer),
             flow_data->parent->buffer_size,
             &tmp_actual_size,
             BMI_PRE_ALLOC,
             q_item->parent->tag,
             &q_item->bmi_callback,
-            global_bmi_context);
-        
+            global_bmi_context,
+            (bmi_hint)q_item->parent->hints);
+
         if(ret < 0)
         {
+            gossip_err("%s: I/O error occurred\n", __func__);
             handle_io_error(ret, q_item, flow_data);
             return;
         }
@@ -1495,10 +1533,10 @@ static void cleanup_buffers(struct fp_pr
         {
             if(flow_data->prealloc_array[i].buffer)
             {
-                BMI_memfree(flow_data->parent->src.u.bmi.address,
-                    flow_data->prealloc_array[i].buffer,
-                    flow_data->parent->buffer_size,
-                    BMI_RECV);
+		    BMI_memfree(flow_data->parent->src.u.bmi.address,
+				    flow_data->prealloc_array[i].buffer,
+				    flow_data->parent->buffer_size,
+				    BMI_RECV);
             }
             result_tmp = &(flow_data->prealloc_array[i].result_chain);
             do{
@@ -1583,6 +1621,7 @@ static void mem_to_bmi_callback_fn(void 
 
     if(error_code != 0 || flow_data->parent->error_code != 0)
     {
+        gossip_err("%s: I/O error occurred\n", __func__);
         handle_io_error(error_code, q_item, flow_data);
         return;
     }
@@ -1632,7 +1671,8 @@ static void mem_to_bmi_callback_fn(void 
         {
             flow_data->intermediate = BMI_memalloc(
                 flow_data->parent->dest.u.bmi.address,
-                flow_data->parent->buffer_size, BMI_SEND);
+                flow_data->parent->buffer_size,
+                BMI_SEND);
             /* TODO: error handling */
             assert(flow_data->intermediate);
         }
@@ -1718,10 +1758,12 @@ static void mem_to_bmi_callback_fn(void 
         buffer_type,
         q_item->parent->tag,
         &q_item->bmi_callback,
-        global_bmi_context);
+        global_bmi_context,
+        (bmi_hint)q_item->parent->hints);
 
     if(ret < 0)
     {
+        gossip_err("%s: I/O error occurred\n", __func__);
         handle_io_error(ret, q_item, flow_data);
         return;
     }
@@ -1766,6 +1808,7 @@ static void bmi_to_mem_callback_fn(void 
 
     if(error_code != 0 || flow_data->parent->error_code != 0)
     {
+        gossip_err("%s: I/O error occurred\n", __func__);
         handle_io_error(error_code, q_item, flow_data);
         return;
     }
@@ -1864,7 +1907,8 @@ static void bmi_to_mem_callback_fn(void 
         {
             flow_data->intermediate = BMI_memalloc(
                 flow_data->parent->src.u.bmi.address,
-                flow_data->parent->buffer_size, BMI_RECV);
+                flow_data->parent->buffer_size,
+                BMI_RECV);
             /* TODO: error handling */
             assert(flow_data->intermediate);
         }
@@ -1915,10 +1959,12 @@ static void bmi_to_mem_callback_fn(void 
         buffer_type,
         q_item->parent->tag,
         &q_item->bmi_callback,
-        global_bmi_context);
+        global_bmi_context,
+        (bmi_hint)q_item->parent->hints);
 
     if(ret < 0)
     {
+        gossip_err("%s: I/O error occurred\n", __func__);
         handle_io_error(ret, q_item, flow_data);
         return;
     }
@@ -1988,7 +2034,7 @@ static void handle_io_error(
         }
         else if (src == TROVE_ENDPOINT && dest == BMI_ENDPOINT)
         {
-            ret = cancel_pending_trove(&flow_data->src_list);
+            ret = cancel_pending_trove(&flow_data->src_list, flow_data->parent->src.u.trove.coll_id);
             flow_data->cleanup_pending_count += ret;
             gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
                 "flowproto-multiqueue canceled %d trove-bmi Trove ops.\n", ret);
@@ -2003,7 +2049,7 @@ static void handle_io_error(
             gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
                 "flowproto-multiqueue canceled %d bmi-trove BMI ops.\n", ret);
             flow_data->cleanup_pending_count += ret;
-            ret = cancel_pending_trove(&flow_data->dest_list);
+            ret = cancel_pending_trove(&flow_data->dest_list, flow_data->parent->dest.u.trove.coll_id);
             gossip_debug(GOSSIP_FLOW_PROTO_DEBUG,
                 "flowproto-multiqueue canceled %d bmi-trove Trove ops.\n", ret);
             flow_data->cleanup_pending_count += ret;
@@ -2086,7 +2132,7 @@ static int cancel_pending_bmi(struct qli
  *
  * returns the number of operations that were canceled 
  */
-static int cancel_pending_trove(struct qlist_head *list)
+static int cancel_pending_trove(struct qlist_head *list, TROVE_coll_id coll_id)
 {
     struct qlist_head *tmp_link;
     struct fp_queue_item *q_item = NULL;
@@ -2111,7 +2157,7 @@ static int cancel_pending_trove(struct q
                 count++;
                 ret = PINT_thread_mgr_trove_cancel(
                     old_result_tmp->posted_id,
-                    q_item->parent->src.u.trove.coll_id,
+                    coll_id,
                     &old_result_tmp->trove_callback);
                 if(ret < 0)
                 {



More information about the Pvfs2-cvs mailing list