00001
00005 #include <stdio.h>
00006 #include <pthread.h>
00007
00008 #include <utils/ConfigFile.h>
00009 #include <TimeSource/TimeSource.h>
00010
00011 #include <ipt/ipt.h>
00012 #include <ipt/sharedmem.h>
00013
00014 #include "VehPoseSource.h"
00015
00016 #include <VehPoseDest/VehPoseStructs.h>
00017
00049 class ShmemVehPoseSource : public VehPoseSource {
00050 public:
00051 ShmemVehPoseSource();
00052 virtual ~ShmemVehPoseSource();
00053
00055 virtual bool getPose(utils::Time time, VehPose& pose);
00056
00058 virtual bool getCurPose(utils::Time& time,
00059 VehPose& pose, bool blocking = false);
00060
00062 bool init(utils::ConfigFile& params, utils::SymbolTable* globals);
00063
00065 static void interpolate(const VehPoseShmemStruct& prev_pose,
00066 const VehPoseShmemStruct& next_pose, double t,
00067 VehPose& veh_pose);
00068
00069
00070 private:
00071 static void* thread_entry(void*);
00072 void collector_thread();
00073 void error(VehPose&);
00074 static void set_pose(const VehPoseShmemStruct& input, VehPose& output);
00075
00076 private:
00077 IPCommunicator* _com;
00078 IPSharedMemory* _shm;
00079
00080 bool _collector_running;
00081 pthread_t _collector_t;
00082 pthread_mutex_t _collector_mutex;
00083 pthread_cond_t _collector_cond;
00084
00085 int _history_length;
00086 int _num_poses;
00087 int _cur_pose_index;
00088 VehPoseShmemStruct* _poses;
00089
00090 int _last_secs, _last_usecs;
00091
00092 float _max_extrapolation;
00093 };
00094
00095 ShmemVehPoseSource::ShmemVehPoseSource()
00096 {
00097 _shm = NULL;
00098 _collector_running = false;
00099 _poses = NULL;
00100 _last_secs = _last_usecs = -1;
00101 }
00102
00103 ShmemVehPoseSource::~ShmemVehPoseSource()
00104 {
00105
00106 if (_shm)
00107 _com->CloseSharedMemory(_shm);
00108
00109
00110 if (_collector_running) {
00111 _collector_running = false;
00112 pthread_cancel(_collector_t);
00113 pthread_mutex_destroy(&_collector_mutex);
00114 pthread_cond_destroy(&_collector_cond);
00115 }
00116
00117 delete [] _poses;
00118 }
00119
00120 void ShmemVehPoseSource::interpolate(const VehPoseShmemStruct& prev_pose,
00121 const VehPoseShmemStruct& cur_pose,
00122 double t, VehPose& veh_pose)
00123 {
00124 VehPose prev_real_pose, cur_real_pose;
00125 set_pose(prev_pose, prev_real_pose);
00126 set_pose(cur_pose, cur_real_pose);
00127 VehPoseSource::interpolate(prev_real_pose, cur_real_pose, t, veh_pose);
00128 }
00129
00130 bool ShmemVehPoseSource::init(utils::ConfigFile& params,
00131 utils::SymbolTable* globals)
00132 {
00133
00134
00135
00136 _com =
00137 IPCommunicator::Communicator(globals,
00138 params.getString("ipt_spec",
00139 "unix:port=0;"));
00140 if (!_com)
00141 return false;
00142
00143
00144
00145 const char* mem_name = params.getString("name", VEH_POSE_SHMEM_NAME);
00146
00147 const char* machine = params.getString("machine");
00148
00149 int port = params.getInt("port", 1389);
00150 char buffer[200];
00151
00152 if (!*machine) {
00153 sprintf(buffer, "managed: name=%s;", mem_name);
00154 } else {
00155 sprintf(buffer, "managed: name='%s@%s|%d';", mem_name, machine, port);
00156 }
00157
00158 const char* mem_spec = params.getString("mem", buffer);
00159 _shm =
00160 _com->OpenSharedMemory(mem_spec, VEH_POSE_SHMEM_FMT,
00161 sizeof(VehPoseShmemStruct));
00162
00163 if (!_shm) {
00164
00165 fprintf(stderr,
00166 "ShmemVehPoseSource::init: Problem opening shared memory %s\n",
00167 mem_spec);
00168 return false;
00169 }
00170
00171
00172 _history_length = params.getInt("history_length", 100);
00173 _num_poses = 0;
00174 _cur_pose_index = -1;
00175 _poses = new VehPoseShmemStruct[_history_length];
00176
00177 _max_extrapolation = params.getFloat("max_extrapolation", 0.2);
00178
00179
00180 pthread_mutex_init(&_collector_mutex, NULL);
00181 pthread_cond_init(&_collector_cond, NULL);
00182 _collector_running = true;
00183 pthread_create(&_collector_t, NULL, thread_entry, this);
00184
00185 return true;
00186 }
00187
00188
00189 void* ShmemVehPoseSource::thread_entry(void* data)
00190 {
00191 ((ShmemVehPoseSource*) data)->collector_thread();
00192 return NULL;
00193 }
00194
00195
00196 void ShmemVehPoseSource::collector_thread()
00197 {
00198 VehPoseShmemStruct incoming;
00199 while (_collector_running) {
00200
00201 if (!_shm->Wait()) {
00202 utils::Time::sleep(0.02);
00203 continue;
00204 }
00205
00206
00207 _shm->FormattedData((void*) &incoming);
00208
00209
00210 if (!incoming.secs && !incoming.usecs)
00211 continue;
00212
00213
00214 pthread_mutex_lock(&_collector_mutex);
00215 if (_num_poses != _history_length)
00216 _num_poses++;
00217 _cur_pose_index = (_cur_pose_index + 1) % _history_length;
00218 _poses[_cur_pose_index] = incoming;
00219 pthread_mutex_unlock(&_collector_mutex);
00220
00221
00222 pthread_cond_signal(&_collector_cond);
00223 }
00224 }
00225
00226
00227
00228 void ShmemVehPoseSource::error(VehPose& veh_pose)
00229 {
00230 veh_pose.pos = utils::Vec3d();
00231 veh_pose.ori = utils::Rotation();
00232 pthread_mutex_unlock(&_collector_mutex);
00233 }
00234
00235
00236
00237 void ShmemVehPoseSource::set_pose(const VehPoseShmemStruct& input,
00238 VehPose& output)
00239 {
00240 output.pos = utils::Vec3d(input.data.x, input.data.y, input.data.z);
00241 output.ori = utils::Rotation(input.data.ori[0], input.data.ori[1],
00242 input.data.ori[2], input.data.ori[3]);
00243 }
00244
00245
00246 bool ShmemVehPoseSource::getPose(utils::Time now, VehPose& veh_pose)
00247 {
00248
00249 pthread_mutex_lock(&_collector_mutex);
00250
00251
00252 VehPoseShmemStruct* cur_pose = &_poses[_cur_pose_index];
00253 utils::Time t(cur_pose->secs, cur_pose->usecs);
00254
00255
00256 if (now > t) {
00257
00258 double elapsed = (now - t).getValue();
00259 if (elapsed > _max_extrapolation) {
00260 fprintf(stderr, "ShmemVehPoseSource::getPose: "
00261 "Pose lookup time too far in the future, delta = %f\n",
00262 (now - t).getValue());
00263 error(veh_pose);
00264 return false;
00265 }
00266
00267 set_pose(*cur_pose, veh_pose);
00268
00269
00270 VehPoseShmemStruct* prev_pose;
00271 utils::Time pt;
00272 int prev_index = _cur_pose_index;
00273 while (1) {
00274 prev_index--;
00275 if (prev_index < 0 && _num_poses < _history_length) {
00276 fprintf(stderr, "ShmemVehPoseSource::getPose: "
00277 "Cannot extrapolate yet (%5.2f)\n",
00278 (now - t).getValue());
00279 error(veh_pose);
00280 return false;
00281 }
00282 prev_pose = &_poses[prev_index];
00283 pt.setValue(prev_pose->secs, prev_pose->usecs);
00284 if (pt > t) {
00285 fprintf(stderr, "ShmemVehPoseSource::getPose: "
00286 "Cannot extrapolate with current history (%5.2f)\n",
00287 (now - t).getValue());
00288 error(veh_pose);
00289 return false;
00290 }
00291 if (pt != t)
00292 break;
00293 }
00294
00295
00296 interpolate(*prev_pose, *cur_pose, 1 + double(now-t)/(t-pt), veh_pose);
00297
00298
00299 pthread_mutex_unlock(&_collector_mutex);
00300 return true;
00301 }
00302
00303
00304
00305 int cur_index = _cur_pose_index;
00306 VehPoseShmemStruct* prev_pose = cur_pose;
00307 for (int i=0;i<_num_poses-1;i++) {
00308 cur_index--;
00309 if (cur_index < 0)
00310 cur_index = _history_length-1;
00311 prev_pose = &_poses[cur_index];
00312 t.setValue(prev_pose->secs, prev_pose->usecs);
00313 if (now > t) {
00314
00315
00316 utils::Time cur(cur_pose->secs, cur_pose->usecs);
00317 double dist = (now-t).getValue()/(cur-t).getValue();
00318
00319
00320 interpolate(*prev_pose, *cur_pose, dist, veh_pose);
00321
00322
00323 pthread_mutex_unlock(&_collector_mutex);
00324 return true;
00325 }
00326 cur_pose = prev_pose;
00327 }
00328
00329
00330 fprintf(stderr, "ShmemVehPoseSource::getPose: "
00331 "State lookup time too old, delta = %f\n",
00332 (now - TimeSource::now()).getValue());
00333 error(veh_pose);
00334
00335 return false;
00336 }
00337
00338
00339 bool ShmemVehPoseSource::getCurPose(utils::Time& time, VehPose& veh_pose,
00340 bool blocking)
00341 {
00342
00343 pthread_mutex_lock(&_collector_mutex);
00344
00345 if (blocking) {
00346
00347 if (_cur_pose_index < 0 ||
00348 (_poses[_cur_pose_index].secs == _last_secs &&
00349 _poses[_cur_pose_index].usecs == _last_usecs)) {
00350
00351 if (pthread_cond_wait(&_collector_cond, &_collector_mutex)) {
00352 pthread_mutex_unlock(&_collector_mutex);
00353 perror("ShmemVehPoseSource::getCurPose: waiting for condition");
00354 error(veh_pose);
00355 return false;
00356 }
00357 }
00358 }
00359
00360
00361 VehPoseShmemStruct& result = _poses[_cur_pose_index];
00362 set_pose(result, veh_pose);
00363 time.setValue(result.secs, result.usecs);
00364
00365
00366 bool res = !(_last_secs == result.secs && _last_usecs == result.usecs);
00367 _last_secs = result.secs;
00368 _last_usecs = result.usecs;
00369
00370
00371 pthread_mutex_unlock(&_collector_mutex);
00372
00373 return res;
00374 }
00375
00377 UTILS_INTF_CREATOR(VehPoseSource, shmem, gen, params, globals)
00378 {
00379 UTILS_INTF_REPORT(VehPoseSource, shmem);
00380 ShmemVehPoseSource* player = new ShmemVehPoseSource();
00381 if (!player->init(*params, globals)) {
00382 delete player;
00383 return NULL;
00384 }
00385 return player;
00386 }