sync.c

Go to the documentation of this file.
00001 /* $Id: sync.c 23143 2005-11-16 21:29:08Z adam $ */
00002 /*****************************************************************************/
00003 /**
00004  * \file        dsi/lib/src/sync.c
00005  *
00006  * \brief       Implementation of the synchorinzation thread.
00007  *
00008  * \date        07/08/2000
00009  * \author      Lars Reuther <reuther@os.inf.tu-dresden.de>
00010  * 
00011  * This thread handles the synchronization messages from the remote
00012  * work thread. They are send if the work thread of the remote
00013  * component waits for a packet to be commited by the local work
00014  * thread. If the specified packet is not commited yet, the wait flag
00015  * is set locally in the packet flags and the remote work thread stays
00016  * in its ipc-call until the local work thread commited the packet an
00017  * send the wakeup message to us (the synchronization thread).
00018  *
00019  * If the synchronization thread receives the wakeup message for a
00020  * packet (from the local work thread) and the wait flag is not set,
00021  * the notification pending flag is set in the packet flags and a
00022  * subsequent synchronization call of the remote work thread returns
00023  * immediately. This situation can happen if the remote work thread is
00024  * preemted in the middle of dsi_down.
00025  *
00026  * Currently we have two versions of the synchronization thread, one for
00027  * send components and one for receive components. While the sync thread of 
00028  * a receive component just handles wait requests for packets to be 
00029  * acknowledged by the receive component, the sync thread of a send component
00030  * also implements the mapping/copying of data to the receiver. 
00031  *
00032  * \todo Check packet number, especially if provided by the remote component
00033  *
00034  * \note It is perfectly legal for the remote component to abort an ongoing
00035  *      dsi_down() due to the end of transmission. This means, our unblock-
00036  *      IPC may fail. This is not indicated by the packet_commit() function,
00037  *      nor can it be found out somehow else.
00038  */
00039 /*****************************************************************************/
00040 
00041 /* L4/DROPS includes */
00042 #include <l4/sys/types.h>
00043 #include <l4/sys/ipc.h>
00044 #include <l4/env/errno.h>
00045 #include <l4/util/macros.h>
00046 #include <l4/thread/thread.h>
00047 
00048 /* library includes */
00049 #include <l4/dsi/dsi.h>
00050 #include "__sync.h"
00051 #include "__socket.h"
00052 #include "__thread.h"
00053 #include "__debug.h"
00054 #include "__config.h"
00055 
00056 /*****************************************************************************
00057  * helpers
00058  *****************************************************************************/
00059 
00060 /*****************************************************************************/
00061 /**
00062  * \brief Send reply to remote work thread (receiver), map packet data
00063  * 
00064  * \param socket Socket descriptor
00065  * \param packet Packet descriptor 
00066  */
00067 /*****************************************************************************/ 
00068 static inline void
00069 __wakeup_and_map(dsi_socket_t * socket,dsi_packet_t * packet)
00070 {
00071   void *addr;
00072   l4_addr_t offs;
00073   l4_addr_t size;
00074   int ret,i;
00075   l4_msgdope_t result;
00076 
00077   if (packet->sg_len > 1)
00078     Panic("DSI: map scatter gather list not yet supported!");
00079 
00080   offs = socket->sg_lists[packet->sg_list].addr;
00081   addr = socket->data_area + offs;
00082   size = socket->sg_lists[packet->sg_list].size;
00083 
00084   /* get log2 pagesize */
00085   i = 12;
00086   while (size > (1U << i))
00087     i++;
00088 
00089   if (size != (1U << i))
00090     Panic("DSI: unaligned pages not supported yet!");
00091   
00092   LOGdL(DEBUG_MAP_PACKET,"map packet %u, size %lu (%d)",packet->no,size,i);
00093   LOGdL(DEBUG_MAP_PACKET,"addr 0x%08lx, offset 0x%08lx",(unsigned long)addr,offs);
00094 
00095   /* send map message */
00096   ret = l4_ipc_send(socket->remote_socket.work_th,L4_IPC_SHORT_FPAGE,
00097                          offs,l4_fpage((l4_addr_t)addr,i,
00098                                        L4_FPAGE_RW,L4_FPAGE_MAP).fpage,
00099                          L4_IPC_SEND_TIMEOUT_0,&result);
00100   if (ret)
00101     LOG_Error("DSI: IPC error in wakeup 0x%02x",ret); 
00102 }
00103 
00104 /*****************************************************************************/
00105 /**
00106  * \brief Send reply to remote work thread (receiver), copy packet data
00107  * 
00108  * \param socket Socket descriptor
00109  * \param packet Packet descriptor 
00110  */
00111 /*****************************************************************************/ 
00112 static inline void
00113 __wakeup_and_copy(dsi_socket_t * socket,dsi_packet_t * packet)
00114 {
00115   l4_addr_t offs;
00116   l4_size_t size;
00117   int ret;
00118   struct {
00119     l4_umword_t      rcv_fpage;
00120     l4_msgdope_t size_dope;
00121     l4_msgdope_t send_dope;
00122     l4_umword_t      dw0;
00123     l4_umword_t      dw1;
00124     l4_strdope_t buf;
00125   } msg_buf;
00126   l4_msgdope_t result;
00127 
00128   if (packet->sg_len > 1)
00129     Panic("DSI: copy scatter gather list not yet supported!");
00130 
00131   offs = socket->sg_lists[packet->sg_list].addr;
00132   size = socket->sg_lists[packet->sg_list].size;
00133 
00134   LOGdL(DEBUG_COPY_PACKET,"copy packet");
00135   LOGdL(DEBUG_COPY_PACKET,"addr 0x%08lx, size %u",
00136       (unsigned)socket->data_area + offs,size);
00137 
00138   /* send message */
00139   msg_buf.size_dope = L4_IPC_DOPE(2,1);
00140   msg_buf.send_dope = L4_IPC_DOPE(2,1);
00141   msg_buf.buf.snd_str = (l4_addr_t)socket->data_area + offs;
00142   msg_buf.buf.snd_size = size;
00143   ret = l4_ipc_send(socket->remote_socket.work_th,&msg_buf,
00144                          0,0,L4_IPC_SEND_TIMEOUT_0,&result);
00145   if (ret)
00146     LOG_Error("DSI: IPC error in wakeup 0x%02x", ret);   
00147 }
00148 
00149 /*****************************************************************************/
00150 /**
00151  * \brief Synchronization thread, send component.
00152  * 
00153  * \param data thread data, pointer to socket descriptor
00154  */
00155 /*****************************************************************************/ 
00156 void
00157 dsi_sync_thread_send(void * data)
00158 {
00159   dsi_socket_t * socket;
00160   dsi_packet_t * packet;
00161   l4_threadid_t parent = l4thread_l4_id(l4thread_get_parent());
00162   int ret;
00163   l4_umword_t dw0,dw1;
00164   l4_msgdope_t result;
00165   l4_threadid_t src;
00166 
00167   LOGdL(DEBUG_SYNC_SEND,"up, parent "l4util_idfmt, l4util_idstr(parent));
00168   
00169   /* sanity checks */
00170   Assert(data != NULL);
00171   socket = (dsi_socket_t *)data;
00172 
00173   /* wait for parent to send connect message */ 
00174   ret = l4_ipc_receive(parent,L4_IPC_SHORT_MSG,&dw0,&dw1,
00175                             L4_IPC_NEVER,&result);
00176   if (ret || (dw0 != DSI_SYNC_CONNECTED))
00177     {
00178       Panic("DSI: sync setup IPC failed (0x%02X)!",ret);
00179       return;
00180     }
00181   Assert(data == (void *)dw1);
00182   
00183   LOGdL(DEBUG_SYNC_SEND,"connected.");
00184   LOGdL(DEBUG_SYNC_SEND,"remote socket %d, work "l4util_idfmt,
00185         socket->remote_socket.socket,
00186         l4util_idstr(socket->remote_socket.work_th));
00187   
00188   /* snychronization loop */
00189   while (1)
00190     {
00191       ret = l4_ipc_wait(&src,L4_IPC_SHORT_MSG,&dw0,&dw1,
00192                              L4_IPC_NEVER,&result);
00193 
00194       if (!ret)
00195         {
00196           LOGdL(DEBUG_SYNC_SEND,"msg from "l4util_idfmt, l4util_idstr(src));
00197           LOGdL(DEBUG_SYNC_SEND,"dw0 = %lu, dw1 = %lu",dw0,dw1);
00198           
00199           switch (dw0)
00200             {
00201             case DSI_SYNC_COMMITED:
00202               if (l4_task_equal(src,socket->work_th))
00203                 {
00204                   /*********************************************************
00205                    * message from local thread
00206                    ********************************************************/
00207                   /* packet commited (dw1 -> packet index), wakeup receiver 
00208                    * work thread */
00209                   LOGdL(DEBUG_SYNC_SEND,"wakeup receiver, packet %ld",dw1);
00210 
00211                   packet = &socket->packets[dw1];
00212 
00213                   /* check if receiver is already waiting */
00214                   if (packet->flags & DSI_PACKET_RX_WAITING)
00215                     {
00216                       LOGdL(DEBUG_SYNC_SEND,"receiver already waiting");
00217                       LOGdL(DEBUG_SYNC_SEND,"wakeup "l4util_idfmt,
00218                             l4util_idstr(socket->remote_socket.work_th));
00219                       
00220                       if (socket->flags & DSI_SOCKET_MAP)
00221                         __wakeup_and_map(socket,packet);
00222                       else if (socket->flags & DSI_SOCKET_COPY)
00223                         __wakeup_and_copy(socket,packet);
00224                       else
00225                         {
00226                           /* wakeup */
00227                           ret = l4_ipc_send(socket->remote_socket.work_th,
00228                                                  L4_IPC_SHORT_MSG,0,0,
00229                                                  L4_IPC_SEND_TIMEOUT_0,
00230                                                  &result);
00231                           if (ret)
00232                             LOG_Error("DSI: IPC error in wakeup 0x%02x", ret);
00233                         }
00234                       
00235                       /* reset flags */
00236                       packet->flags &= (~DSI_PACKET_RX_WAITING);
00237                     }
00238                   else
00239                     {
00240                       /* receiver not waiting yet */
00241                       LOGdL(DEBUG_SYNC_SEND,"receiver not yet waiting");
00242 
00243                       /* mark wakeup notification pending */
00244                       packet->flags |= DSI_PACKET_RX_PENDING;
00245                       
00246                     }
00247 
00248                   /* done */
00249                 } else goto e_inv_sender;
00250               break;
00251             case DSI_SYNC_WAIT:
00252               if (l4_task_equal(src,socket->remote_socket.work_th))
00253                 {
00254                   /**********************************************************
00255                    * message from remote work thread
00256                    *********************************************************/
00257                   /* wait for packet (dw1 -> packet index) */
00258                   LOGdL(DEBUG_SYNC_SEND,"receiver waits for packet %ld",dw1);
00259 
00260                   packet = &socket->packets[dw1];
00261 
00262                   /* check if wakeup notification is already pending */
00263                   if (packet->flags & DSI_PACKET_RX_PENDING)
00264                     {
00265                       LOGdL(DEBUG_SYNC_SEND,"notification pending");
00266                       LOGdL(DEBUG_SYNC_SEND,"wakeup "l4util_idfmt,
00267                             l4util_idstr(src));
00268                       
00269                       if (socket->flags & DSI_SOCKET_MAP)
00270                         __wakeup_and_map(socket,packet);
00271                       else if (socket->flags & DSI_SOCKET_COPY)
00272                         __wakeup_and_copy(socket,packet);
00273                       else
00274                         {
00275                           /* wakeup */
00276                           ret = l4_ipc_send(socket->remote_socket.work_th,
00277                                                  L4_IPC_SHORT_MSG,0,0,
00278                                                  L4_IPC_SEND_TIMEOUT_0,
00279                                                  &result);
00280                           if (ret)
00281                             LOG_Error("DSI: IPC error in reply 0x%02x",ret);
00282                         }
00283 
00284                       /* reset pending flag */
00285                       packet->flags &= (~DSI_PACKET_RX_PENDING);
00286                     }
00287                   else
00288                     {
00289                       /* no notification yet */
00290                       LOGdL(DEBUG_SYNC_SEND,"set wait flag");
00291 
00292                       packet->flags |= DSI_PACKET_RX_WAITING;
00293                       /* and set the work-thread to that of the sender
00294                          of this message, as it can be changed. */
00295                       socket->remote_socket.work_th = src;
00296                     }
00297                 } else goto e_inv_sender;
00298               break;
00299 
00300             case DSI_SYNC_RELEASE:
00301               if (l4_task_equal(src,socket->remote_socket.work_th))
00302                 {
00303                   /* receive component released packet (dw1 -> packet), call 
00304                    * notification callback */
00305 
00306                   packet = &socket->packets[dw1];
00307 
00308                   LOGdL(DEBUG_SYNC,"released packet %u (idx %lu)",
00309                         packet->no,dw1);
00310 
00311                   /* call release callback function */
00312                   if (socket->release_callback)
00313                     socket->release_callback(socket,packet);
00314 
00315 #if RELEASE_DO_CALL
00316                   /* reply */
00317                   ret = l4_ipc_send(src,L4_IPC_SHORT_MSG,0,0,
00318                                          L4_IPC_SEND_TIMEOUT_0,
00319                                     &result);
00320                   if (ret)
00321                     LOG_Error("DSI: sync notification reply failed (0x%02x)!",
00322                               ret);
00323 #endif            
00324                 } else goto e_inv_sender;
00325               break;
00326 
00327             case DSI_SYNC_MAP:
00328               if (l4_task_equal(src,socket->remote_socket.work_th))
00329                 {
00330                   /* map packet data (dw1 -> packet) to receive component */
00331                   __wakeup_and_map(socket,&socket->packets[dw1]);
00332                 } else goto e_inv_sender;
00333               break;
00334 
00335             case DSI_SYNC_COPY:
00336               if (l4_task_equal(src,socket->remote_socket.work_th))
00337                 {
00338                   /* copy packet data (dw1 -> packet) to receive component */
00339                   __wakeup_and_copy(socket,&socket->packets[dw1]);
00340                 } else goto e_inv_sender;
00341               break;
00342             default:
00343               LOG_Error("DSI: invalid command (%ld) from "l4util_idfmt, dw0,
00344                         l4util_idstr(src));
00345             }
00346           continue;
00347 
00348           e_inv_sender:
00349           LOG_Error("DSI: ignoring message from "l4util_idfmt,
00350                     l4util_idstr(src));
00351 
00352         } /* if (!ret) */
00353 
00354       LOG_Error("DSI: IPC error in sender sync thread 0x%02x",ret);
00355     } /* while(1) */
00356 
00357   /* this should never happen */
00358   Panic("DSI: left sync loop!");
00359 }
00360 
00361 /*****************************************************************************/
00362 /**
00363  * \brief Synchronization thread, receive component
00364  * \ingroup internal
00365  * 
00366  * \param data thread data, pointer to socket descriptor
00367  */
00368 /*****************************************************************************/ 
00369 void
00370 dsi_sync_thread_receive(void * data)
00371 {
00372   dsi_socket_t * socket;
00373   dsi_packet_t * packet;
00374   l4_threadid_t parent = l4thread_l4_id(l4thread_get_parent());
00375   int ret;
00376   l4_umword_t dw0,dw1;
00377   l4_msgdope_t result;
00378   l4_threadid_t src;
00379   
00380   LOGdL(DEBUG_SYNC_RECEIVE,"up, parent "l4util_idfmt, l4util_idstr(parent));
00381 
00382   /* sanity checks */
00383   Assert(data != NULL);
00384   socket = (dsi_socket_t *)data;
00385 
00386   /* wait for parent to send connect message */ 
00387   ret = l4_ipc_receive(parent,L4_IPC_SHORT_MSG,&dw0,&dw1,
00388                        L4_IPC_NEVER,&result);
00389   if (ret || (dw0 != DSI_SYNC_CONNECTED))
00390     {
00391       Panic("DSI: sync setup IPC failed (0x%02X)!",ret);
00392       return;
00393     }
00394   Assert(data == (void *)dw1);
00395   
00396   LOGdL(DEBUG_SYNC_RECEIVE,"connected.");
00397   LOGdL(DEBUG_SYNC_RECEIVE,"remote socket %d, work "l4util_idfmt,
00398         socket->remote_socket.socket,
00399         l4util_idstr(socket->remote_socket.work_th));
00400   
00401   /* synchronization thread loop */
00402   while (1)
00403     {
00404       ret = l4_ipc_wait(&src,L4_IPC_SHORT_MSG,&dw0,&dw1,
00405                              L4_IPC_NEVER,&result);      
00406 
00407       if (!ret)
00408         {
00409           LOGdL(DEBUG_SYNC_RECEIVE,"msg from "l4util_idfmt, l4util_idstr(src));
00410           LOGdL(DEBUG_SYNC_RECEIVE,"dw0 = %lu, dw1 = %lu",dw0,dw1);
00411 
00412           switch (dw0)
00413             {
00414             case DSI_SYNC_COMMITED:
00415               if (l4_task_equal(src,socket->work_th))
00416                 {
00417                   /*********************************************************
00418                    * message from local thread
00419                    ***************************************************************/
00420                   /* packet commited (dw1 -> packet index), wakeup sender
00421                    * work thread */
00422                   LOGdL(DEBUG_SYNC_RECEIVE,"wakeup sender, packet %ld",dw1);
00423 
00424                   packet = &socket->packets[dw1];
00425 
00426                   /* check if sender is already waiting */
00427                   if (packet->flags & DSI_PACKET_TX_WAITING)
00428                     {
00429                       LOGdL(DEBUG_SYNC_RECEIVE,"sender already waiting");
00430                       LOGdL(DEBUG_SYNC_RECEIVE,"wakeup "l4util_idfmt,
00431                             l4util_idstr(socket->remote_socket.work_th));
00432                       
00433                       /* wakeup */
00434                       ret = l4_ipc_send(socket->remote_socket.work_th,
00435                                              L4_IPC_SHORT_MSG,0,0,
00436                                              L4_IPC_SEND_TIMEOUT_0,
00437                                              &result);
00438                       if (ret)
00439                         LOG_Error("DSI: IPC error in wakeup 0x%02x",ret);
00440 
00441                       /* reset flags */
00442                       packet->flags &= (~DSI_PACKET_TX_WAITING);
00443                     }
00444                   else
00445                     {
00446                       /* sender not yet waiting */
00447                       LOGdL(DEBUG_SYNC_RECEIVE,"sender not yet waiting");
00448 
00449                       /* mark wakeup notification pending */
00450                       packet->flags |= DSI_PACKET_TX_PENDING;
00451                     }
00452                 } else goto e_inv_sender;
00453 
00454               break;
00455             case DSI_SYNC_WAIT:
00456               if (l4_task_equal(src,socket->remote_socket.work_th))
00457                 {
00458                   /**********************************************************
00459                    * message from remote work thread
00460                    *********************************************************/
00461                   /* wait for packet (dw1 -> packet index) */
00462                   LOGdL(DEBUG_SYNC_RECEIVE,"sender waits for packet %ld",dw1);
00463 
00464                   packet = &socket->packets[dw1];
00465 
00466                   /* check if wakeup notification is already pending */
00467                   if (packet->flags & DSI_PACKET_TX_PENDING)
00468                     {
00469                       LOGdL(DEBUG_SYNC,"notification pending");
00470                       LOGdL(DEBUG_SYNC,"wakeup "l4util_idfmt, 
00471                             l4util_idstr(src));
00472 
00473                       /* receiver already commited packet, reply immediately */
00474                       ret = l4_ipc_send(src,L4_IPC_SHORT_MSG,0,0,
00475                                              L4_IPC_SEND_TIMEOUT_0,
00476                                              &result);
00477                       if (ret)
00478                         LOG_Error("DSI: IPC error in reply 0x%02x", ret);
00479 
00480                       /* reset pending flag */
00481                       packet->flags &= (~DSI_PACKET_TX_PENDING);
00482                     }
00483                   else
00484                     {
00485                       /* no notification yet */
00486                       LOGdL(DEBUG_SYNC,"set wait flag");
00487 
00488                       /* mark packet */
00489                       packet->flags |= DSI_PACKET_TX_WAITING;
00490                       /* and set the work-thread to that of the sender
00491                          of this message, as it can be changed. */
00492                       socket->remote_socket.work_th = src;
00493                     }
00494                 } else goto e_inv_sender;
00495                   
00496               break;
00497               
00498             default:
00499               LOG_Error("DSI: invalid command (%ld) from "l4util_idfmt,
00500                         dw0, l4util_idstr(src));
00501             }
00502           continue;
00503 
00504           e_inv_sender:
00505           LOG_Error("ignoring message from "l4util_idfmt, l4util_idstr(src));
00506           
00507         } /* if (!ret) */
00508 
00509       LOG_Error("DSI: IPC error in receiver sync thread 0x%02x", ret);
00510     } /* while(1) */
00511 
00512   /* this should never happen */
00513   Panic("DSI: left sync loop!");
00514 }
00515 
00516 /*****************************************************************************/
00517 /**
00518  * \brief Send connect message to synchronization thread.
00519  * \ingroup internal
00520  *
00521  * \param socket Socket descriptor
00522  * \return 0 on success (sent start to sync thread), error code otherwise:
00523  *         - -L4_EINVAL invalid synchronization thread id
00524  *         - -L4_EIPC   IPC error calling synchronization thread
00525  */
00526 /*****************************************************************************/ 
00527 int
00528 dsi_start_sync_thread(dsi_socket_t * socket)
00529 {
00530   int ret;
00531   l4_msgdope_t result; 
00532 
00533   /* sanity check */
00534   if (l4_is_invalid_id(socket->sync_th))
00535     return -L4_EINVAL;
00536 
00537   /* send IPC message */
00538   ret = l4_ipc_send(socket->sync_th,L4_IPC_SHORT_MSG,DSI_SYNC_CONNECTED,
00539                          (l4_umword_t)socket,L4_IPC_NEVER,&result);
00540   if (ret)
00541     return -L4_EIPC;
00542   else
00543     return 0;
00544 }

Generated on Wed Apr 11 06:40:13 2012 for DSI - Drops Streaming Interface by  doxygen 1.5.6