Main Page | Modules | Namespace List | Class Hierarchy | Class List | Directories | File List | Class Members | File Members | Related Pages

ShmemVehPoseSource.cc

Go to the documentation of this file.
00001 
00005 #include <stdio.h>
00006 #include <pthread.h>
00007 
00008 #include <utils/ConfigFile.h>
00009 #include <TimeSource/TimeSource.h>
00010 
00011 #include <ipt/ipt.h>
00012 #include <ipt/sharedmem.h>
00013 
00014 #include "VehPoseSource.h"
00015 
00016 #include <VehPoseDest/VehPoseStructs.h>
00017 
00049 class ShmemVehPoseSource : public VehPoseSource {
00050 public:
00051   ShmemVehPoseSource();
00052   virtual ~ShmemVehPoseSource();
00053 
00055   virtual bool getPose(utils::Time time, VehPose& pose);
00056 
00058   virtual bool getCurPose(utils::Time& time,
00059                           VehPose& pose, bool blocking = false);
00060 
00062   bool init(utils::ConfigFile& params, utils::SymbolTable* globals);
00063 
00065   static void interpolate(const VehPoseShmemStruct& prev_pose, 
00066                           const VehPoseShmemStruct& next_pose, double t,
00067                           VehPose& veh_pose);
00068 
00069 
00070 private:
00071   static void* thread_entry(void*);
00072   void collector_thread();
00073   void error(VehPose&); 
00074   static void set_pose(const VehPoseShmemStruct& input, VehPose& output);
00075 
00076 private:
00077   IPCommunicator* _com;  // the IPT communicator
00078   IPSharedMemory* _shm;  // the pose source shared memory region
00079 
00080   bool _collector_running;  // true if the collector thread is running
00081   pthread_t _collector_t;  // the collector thread ID
00082   pthread_mutex_t _collector_mutex;  // mutex which guards ring buffer
00083   pthread_cond_t _collector_cond;  // conditional which signals new pose
00084 
00085   int _history_length;  // the size of the pose ring buffer
00086   int _num_poses;      // number of poses in the ring buffer
00087   int _cur_pose_index;  // most recent pose index in the ring buffer
00088   VehPoseShmemStruct* _poses;  // the ring buffer
00089 
00090   int _last_secs, _last_usecs;  // time stamp of the last pose received
00091                                 // by getCurPose
00092   float _max_extrapolation;  // how far can we extrapolate pose information?
00093 };
00094 
00095 ShmemVehPoseSource::ShmemVehPoseSource()
00096 {
00097   _shm = NULL;
00098   _collector_running = false;
00099   _poses = NULL;
00100   _last_secs = _last_usecs = -1;
00101 }
00102 
00103 ShmemVehPoseSource::~ShmemVehPoseSource()
00104 {
00105   // close shared memory, if necessary
00106   if (_shm)
00107     _com->CloseSharedMemory(_shm);
00108 
00109   // shutdown collector thread and cleanup, if necessary
00110   if (_collector_running) {
00111     _collector_running = false;
00112     pthread_cancel(_collector_t);
00113     pthread_mutex_destroy(&_collector_mutex);
00114     pthread_cond_destroy(&_collector_cond);
00115   }
00116 
00117   delete [] _poses;
00118 }
00119 
00120 void ShmemVehPoseSource::interpolate(const VehPoseShmemStruct& prev_pose, 
00121                                      const VehPoseShmemStruct& cur_pose,
00122                                      double t, VehPose& veh_pose)
00123 {
00124   VehPose prev_real_pose, cur_real_pose;
00125   set_pose(prev_pose, prev_real_pose);
00126   set_pose(cur_pose, cur_real_pose);
00127   VehPoseSource::interpolate(prev_real_pose, cur_real_pose, t, veh_pose);
00128 }
00129 
00130 bool ShmemVehPoseSource::init(utils::ConfigFile& params,
00131                               utils::SymbolTable* globals)
00132 {
00133   // IPCommunicator::Communicator creates an IPT communicator only if
00134   // one has not been created and cached in globals already.
00135   // If it does create one, it caches it in globals.
00136   _com =
00137     IPCommunicator::Communicator(globals, 
00138                                  params.getString("ipt_spec",
00139                                                    "unix:port=0;"));
00140   if (!_com)
00141     return false;
00142 
00143   // create the shared memory region
00144   // for convenience, the user can just specify name
00145   const char* mem_name = params.getString("name", VEH_POSE_SHMEM_NAME);
00146   // optionally, machine (empty means on local host
00147   const char* machine = params.getString("machine");
00148   // and the port that the memory manager is running on
00149   int port = params.getInt("port", 1389);
00150   char buffer[200];
00151   // to create a default memory specification for managed shared memory
00152   if (!*machine) {
00153     sprintf(buffer, "managed: name=%s;", mem_name);
00154   } else {
00155     sprintf(buffer, "managed: name='%s@%s|%d';", mem_name, machine, port);
00156   }
00157   // of course, this can be overridden through explicit specification of "mem"
00158   const char* mem_spec = params.getString("mem", buffer);
00159   _shm =
00160     _com->OpenSharedMemory(mem_spec, VEH_POSE_SHMEM_FMT,
00161                           sizeof(VehPoseShmemStruct));
00162   // if we can't open shared memory
00163   if (!_shm) {
00164     // print an error and return bad
00165     fprintf(stderr,
00166             "ShmemVehPoseSource::init: Problem opening shared memory %s\n",
00167             mem_spec);
00168     return false;
00169   }
00170 
00171   // now create and set up the ring buffer
00172   _history_length = params.getInt("history_length",  100);
00173   _num_poses = 0;
00174   _cur_pose_index = -1;
00175   _poses = new VehPoseShmemStruct[_history_length];
00176 
00177   _max_extrapolation = params.getFloat("max_extrapolation", 0.2);
00178 
00179   // and finally, kick off the collector thread
00180   pthread_mutex_init(&_collector_mutex, NULL);
00181   pthread_cond_init(&_collector_cond, NULL);
00182   _collector_running = true;
00183   pthread_create(&_collector_t, NULL, thread_entry, this);
00184 
00185   return true;
00186 }
00187 
00188 // entry point for the collector thread
00189 void* ShmemVehPoseSource::thread_entry(void* data)
00190 {
00191   ((ShmemVehPoseSource*) data)->collector_thread();
00192   return NULL;
00193 }
00194 
00195 // the collector thread
00196 void ShmemVehPoseSource::collector_thread()
00197 {
00198   VehPoseShmemStruct incoming;
00199   while (_collector_running) {
00200     // wait for new data in the shared memory
00201     if (!_shm->Wait()) {
00202       utils::Time::sleep(0.02);  // make sure we don't busy wait
00203       continue;
00204     }
00205 
00206     // unmarshal it and stick pose in incoming
00207     _shm->FormattedData((void*) &incoming);
00208     // the time tag of the pose can get set to 0 on startup and shutdown
00209     // of the module producing the poses.  Just skip these.
00210     if (!incoming.secs && !incoming.usecs)
00211       continue;
00212 
00213     // Put the new pose in the ring buffer
00214     pthread_mutex_lock(&_collector_mutex);
00215     if (_num_poses != _history_length)
00216       _num_poses++;
00217     _cur_pose_index = (_cur_pose_index + 1) % _history_length;
00218     _poses[_cur_pose_index] = incoming;
00219     pthread_mutex_unlock(&_collector_mutex);
00220 
00221     // signal we have new data for anyone blocking in getCurPose
00222     pthread_cond_signal(&_collector_cond);
00223   }
00224 }
00225 
00226 // convenience method for invalidating sensor pose and releasing the
00227 // collector mutex
00228 void ShmemVehPoseSource::error(VehPose& veh_pose)
00229 {
00230   veh_pose.pos = utils::Vec3d();
00231   veh_pose.ori = utils::Rotation();
00232   pthread_mutex_unlock(&_collector_mutex);
00233 }
00234 
00235 // convenience method for setting a sensor pose output from the data in the
00236 // shared memory region input
00237 void ShmemVehPoseSource::set_pose(const VehPoseShmemStruct& input,
00238                                   VehPose& output)
00239 {
00240   output.pos = utils::Vec3d(input.data.x, input.data.y, input.data.z);
00241   output.ori = utils::Rotation(input.data.ori[0], input.data.ori[1], 
00242                                input.data.ori[2], input.data.ori[3]);
00243 }
00244 
00245 // Lookup the sensor pose at time now and put it in sensor pose, if possible
00246 bool ShmemVehPoseSource::getPose(utils::Time now, VehPose& veh_pose)
00247 {
00248   // lock the ring buffer
00249   pthread_mutex_lock(&_collector_mutex);
00250 
00251   // Get the latest sensor pose
00252   VehPoseShmemStruct* cur_pose = &_poses[_cur_pose_index];
00253   utils::Time t(cur_pose->secs, cur_pose->usecs);
00254 
00255   // if the requested time is after the latest sensor pose
00256   if (now > t) {
00257     // figure out if we can extrapolate
00258     double elapsed = (now - t).getValue();
00259     if (elapsed > _max_extrapolation) {
00260       fprintf(stderr, "ShmemVehPoseSource::getPose: "
00261               "Pose lookup time too far in the future, delta = %f\n",
00262               (now - t).getValue());
00263       error(veh_pose);
00264       return false;
00265     }
00266     // default all fields to cur_pose value.
00267     set_pose(*cur_pose, veh_pose);
00268 
00269     // get previous pose
00270     VehPoseShmemStruct* prev_pose;
00271     utils::Time pt;
00272     int prev_index = _cur_pose_index;
00273     while (1) {
00274       prev_index--;
00275       if (prev_index < 0 && _num_poses < _history_length) {
00276         fprintf(stderr, "ShmemVehPoseSource::getPose: "
00277                 "Cannot extrapolate yet (%5.2f)\n",
00278                 (now - t).getValue());
00279         error(veh_pose);
00280         return false;
00281       }
00282       prev_pose = &_poses[prev_index];
00283       pt.setValue(prev_pose->secs, prev_pose->usecs); 
00284       if (pt > t) {
00285         fprintf(stderr, "ShmemVehPoseSource::getPose: "
00286                 "Cannot extrapolate with current history (%5.2f)\n",
00287                 (now - t).getValue());
00288         error(veh_pose);
00289         return false;
00290       }
00291       if (pt != t)
00292         break;
00293     }
00294     
00295     // extrapolate pose (with t > 1)
00296     interpolate(*prev_pose, *cur_pose, 1 + double(now-t)/(t-pt), veh_pose);
00297 
00298     // unlock the ring buffer and return
00299     pthread_mutex_unlock(&_collector_mutex);
00300     return true;
00301   }
00302 
00303   // go through the ring buffer looking for the elements bracketing
00304   // the requested time
00305   int cur_index = _cur_pose_index;
00306   VehPoseShmemStruct* prev_pose = cur_pose;
00307   for (int i=0;i<_num_poses-1;i++) {
00308     cur_index--;
00309     if (cur_index < 0)
00310       cur_index = _history_length-1;
00311     prev_pose = &_poses[cur_index];
00312     t.setValue(prev_pose->secs, prev_pose->usecs);
00313     if (now > t) {
00314       // we have found the bracketing elements
00315 
00316       utils::Time cur(cur_pose->secs, cur_pose->usecs);
00317       double dist = (now-t).getValue()/(cur-t).getValue();
00318 
00319       // interpolate appropriately
00320       interpolate(*prev_pose, *cur_pose, dist, veh_pose);
00321 
00322       // unlock the ring buffer and return
00323       pthread_mutex_unlock(&_collector_mutex);
00324       return true;
00325     }
00326     cur_pose = prev_pose;
00327   }
00328 
00329   // requrest time is too far in the past
00330   fprintf(stderr, "ShmemVehPoseSource::getPose: "
00331           "State lookup time too old, delta = %f\n",
00332           (now - TimeSource::now()).getValue());
00333   error(veh_pose);
00334 
00335   return false;
00336 }
00337 
00338 // get the latest sensor pose, blocking if necessary
00339 bool ShmemVehPoseSource::getCurPose(utils::Time& time, VehPose& veh_pose,
00340                                     bool blocking)
00341 {
00342   // lock the ring buffer
00343   pthread_mutex_lock(&_collector_mutex);
00344 
00345   if (blocking) {
00346     // if we are not at the first one and do not have new data
00347     if (_cur_pose_index < 0 ||
00348         (_poses[_cur_pose_index].secs == _last_secs &&
00349          _poses[_cur_pose_index].usecs == _last_usecs)) {
00350       // wait for new data
00351       if (pthread_cond_wait(&_collector_cond, &_collector_mutex)) {
00352         pthread_mutex_unlock(&_collector_mutex);
00353         perror("ShmemVehPoseSource::getCurPose: waiting for condition");
00354         error(veh_pose);
00355         return false;
00356       }
00357     }
00358   } 
00359 
00360   // get the latest sensor pose
00361   VehPoseShmemStruct& result = _poses[_cur_pose_index];
00362   set_pose(result, veh_pose);
00363   time.setValue(result.secs, result.usecs);
00364 
00365   // mark if this is new data or not
00366   bool res = !(_last_secs == result.secs && _last_usecs == result.usecs);
00367   _last_secs = result.secs;
00368   _last_usecs = result.usecs;
00369 
00370   // unlock the ring buffer
00371   pthread_mutex_unlock(&_collector_mutex);
00372 
00373   return res;
00374 }
00375 
00377 UTILS_INTF_CREATOR(VehPoseSource, shmem, gen, params, globals)
00378 {
00379   UTILS_INTF_REPORT(VehPoseSource, shmem);
00380   ShmemVehPoseSource* player = new ShmemVehPoseSource();
00381   if (!player->init(*params, globals)) {
00382     delete player;
00383     return NULL;
00384   }
00385   return player;
00386 }

Generated on Fri Jun 16 13:21:19 2006 for ModUtils by  doxygen 1.4.4