Node.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2014 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 
18 #ifndef __IGN_TRANSPORT_NODE_HH_INCLUDED__
19 #define __IGN_TRANSPORT_NODE_HH_INCLUDED__
20 
21 #ifdef _MSC_VER
22 #pragma warning(push, 0)
23 #endif
24 #include <google/protobuf/message.h>
25 #ifdef _MSC_VER
26 #pragma warning(pop)
27 #endif
28 
29 #include <algorithm>
30 #include <functional>
31 #include <map>
32 #include <memory>
33 #include <mutex>
34 #include <string>
35 #include <unordered_set>
36 #include <vector>
37 
48 
49 namespace ignition
50 {
51  namespace transport
52  {
53  class NodePrivate;
54 
59 
65  {
68  public: Node(const NodeOptions &_options = NodeOptions());
69 
71  public: virtual ~Node();
72 
79  public: template<typename T> bool Advertise(const std::string &_topic,
80  const AdvertiseOptions &_options = AdvertiseOptions())
81  {
82  std::string fullyQualifiedTopic;
83  if (!TopicUtils::FullyQualifiedName(this->Options().Partition(),
84  this->Options().NameSpace(), _topic, fullyQualifiedTopic))
85  {
86  std::cerr << "Topic [" << _topic << "] is not valid." << std::endl;
87  return false;
88  }
89 
90  std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);
91 
92  auto currentTopics = this->TopicsAdvertised();
93 
94  if (currentTopics.find(fullyQualifiedTopic) != currentTopics.end())
95  {
96  std::cerr << "Topic [" << _topic << "] already advertised. You cannot"
97  << " advertise the same topic twice on the same node."
98  << " If you want to advertise the same topic with different"
99  << " types, use separate nodes" << std::endl;
100  return false;
101  }
102 
103  // Add the topic to the list of advertised topics (if it was not before)
104  this->TopicsAdvertised().insert(fullyQualifiedTopic);
105 
106  // Notify the discovery service to register and advertise my topic.
107  MessagePublisher publisher(fullyQualifiedTopic,
108  this->Shared()->myAddress,
109  this->Shared()->myControlAddress,
110  this->Shared()->pUuid, this->NodeUuid(), _options.Scope(),
111  T().GetTypeName());
112 
113  if (!this->Shared()->discovery->AdvertiseMsg(publisher))
114  {
115  std::cerr << "Node::Advertise(): Error advertising a topic. "
116  << "Did you forget to start the discovery service?"
117  << std::endl;
118  return false;
119  }
120 
121  return true;
122  }
123 
126  public: std::vector<std::string> AdvertisedTopics() const;
127 
131  public: bool Unadvertise(const std::string &_topic);
132 
137  public: bool Publish(const std::string &_topic,
138  const ProtoMsg &_msg);
139 
147  public: template<typename T> bool Subscribe(
148  const std::string &_topic,
149  void(*_cb)(const T &_msg))
150  {
151  std::function<void(const T &)> f = [_cb](const T & _internalMsg)
152  {
153  (*_cb)(_internalMsg);
154  };
155 
156  return this->Subscribe<T>(_topic, f);
157  }
158 
165  public: template<typename T> bool Subscribe(
166  const std::string &_topic,
167  std::function<void(const T &_msg)> &_cb)
168  {
169  std::string fullyQualifiedTopic;
170  if (!TopicUtils::FullyQualifiedName(this->Options().Partition(),
171  this->Options().NameSpace(), _topic, fullyQualifiedTopic))
172  {
173  std::cerr << "Topic [" << _topic << "] is not valid." << std::endl;
174  return false;
175  }
176 
177  // Create a new subscription handler.
178  std::shared_ptr<SubscriptionHandler<T>> subscrHandlerPtr(
179  new SubscriptionHandler<T>(this->NodeUuid()));
180 
181  // Insert the callback into the handler.
182  subscrHandlerPtr->SetCallback(_cb);
183 
184  std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);
185 
186  // Store the subscription handler. Each subscription handler is
187  // associated with a topic. When the receiving thread gets new data,
188  // it will recover the subscription handler associated to the topic and
189  // will invoke the callback.
190  this->Shared()->localSubscriptions.AddHandler(
191  fullyQualifiedTopic, this->NodeUuid(), subscrHandlerPtr);
192 
193  // Add the topic to the list of subscribed topics (if it was not before)
194  this->TopicsSubscribed().insert(fullyQualifiedTopic);
195 
196  // Discover the list of nodes that publish on the topic.
197  if (!this->Shared()->discovery->DiscoverMsg(fullyQualifiedTopic))
198  {
199  std::cerr << "Node::Subscribe(): Error discovering a topic. "
200  << "Did you forget to start the discovery service?"
201  << std::endl;
202  return false;
203  }
204 
205  return true;
206  }
207 
216  public: template<typename C, typename T> bool Subscribe(
217  const std::string &_topic,
218  void(C::*_cb)(const T &_msg),
219  C *_obj)
220  {
221  std::function<void(const T &)> f = [_cb, _obj](const T & _internalMsg)
222  {
223  auto cb = std::bind(_cb, _obj, std::placeholders::_1);
224  cb(_internalMsg);
225  };
226 
227  return this->Subscribe<T>(_topic, f);
228  }
229 
235  public: std::vector<std::string> SubscribedTopics() const;
236 
240  public: bool Unsubscribe(const std::string &_topic);
241 
254  public: template<typename T1, typename T2> bool Advertise(
255  const std::string &_topic,
256  void(*_cb)(const T1 &_req, T2 &_rep, bool &_result),
257  const AdvertiseOptions &_options = AdvertiseOptions())
258  {
259  std::function<void(const T1 &, T2 &, bool &)> f =
260  [_cb](const T1 &_internalReq, T2 &_internalRep, bool &_internalResult)
261  {
262  (*_cb)(_internalReq, _internalRep, _internalResult);
263  };
264 
265  return this->Advertise<T1, T2>(_topic, f, _options);
266  }
267 
280  public: template<typename T1, typename T2> bool Advertise(
281  const std::string &_topic,
282  std::function<void(const T1 &_req, T2 &_rep, bool &_result)> &_cb,
283  const AdvertiseOptions &_options = AdvertiseOptions())
284  {
285  std::string fullyQualifiedTopic;
286  if (!TopicUtils::FullyQualifiedName(this->Options().Partition(),
287  this->Options().NameSpace(), _topic, fullyQualifiedTopic))
288  {
289  std::cerr << "Topic [" << _topic << "] is not valid." << std::endl;
290  return false;
291  }
292 
293  // Create a new service reply handler.
294  std::shared_ptr<RepHandler<T1, T2>> repHandlerPtr(
295  new RepHandler<T1, T2>());
296 
297  // Insert the callback into the handler.
298  repHandlerPtr->SetCallback(_cb);
299 
300  std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);
301 
302  // Add the topic to the list of advertised services.
303  this->SrvsAdvertised().insert(fullyQualifiedTopic);
304 
305  // Store the replier handler. Each replier handler is
306  // associated with a topic. When the receiving thread gets new requests,
307  // it will recover the replier handler associated to the topic and
308  // will invoke the service call.
309  this->Shared()->repliers.AddHandler(
310  fullyQualifiedTopic, this->NodeUuid(), repHandlerPtr);
311 
312  // Notify the discovery service to register and advertise my responser.
313  ServicePublisher publisher(fullyQualifiedTopic,
314  this->Shared()->myReplierAddress,
315  this->Shared()->replierId.ToString(),
316  this->Shared()->pUuid, this->NodeUuid(), _options.Scope(),
317  T1().GetTypeName(), T2().GetTypeName());
318 
319  if (!this->Shared()->discovery->AdvertiseSrv(publisher))
320  {
321  std::cerr << "Node::Advertise(): Error advertising a service. "
322  << "Did you forget to start the discovery service?"
323  << std::endl;
324  return false;
325  }
326 
327  return true;
328  }
329 
343  public: template<typename C, typename T1, typename T2> bool Advertise(
344  const std::string &_topic,
345  void(C::*_cb)(const T1 &_req, T2 &_rep, bool &_result),
346  C *_obj,
347  const AdvertiseOptions &_options = AdvertiseOptions())
348  {
349  std::function<void(const T1 &, T2 &, bool &)> f =
350  [_cb, _obj](const T1 &_internalReq,
351  T2 &_internalRep,
352  bool &_internalResult)
353  {
354  auto cb = std::bind(_cb, _obj, std::placeholders::_1,
355  std::placeholders::_2, std::placeholders::_3);
356  cb(_internalReq, _internalRep, _internalResult);
357  };
358 
359  return this->Advertise<T1, T2>(_topic, f, _options);
360  }
361 
364  public: std::vector<std::string> AdvertisedServices() const;
365 
376  public: template<typename T1, typename T2> bool Request(
377  const std::string &_topic,
378  const T1 &_req,
379  void(*_cb)(const T2 &_rep, const bool _result))
380  {
381  std::function<void(const T2 &, const bool)> f =
382  [_cb](const T2 &_internalRep, const bool _internalResult)
383  {
384  (*_cb)(_internalRep, _internalResult);
385  };
386 
387  return this->Request<T1, T2>(_topic, _req, f);
388  }
389 
400  public: template<typename T1, typename T2> bool Request(
401  const std::string &_topic,
402  const T1 &_req,
403  std::function<void(const T2 &_rep, const bool _result)> &_cb)
404  {
405  std::string fullyQualifiedTopic;
406  if (!TopicUtils::FullyQualifiedName(this->Options().Partition(),
407  this->Options().NameSpace(), _topic, fullyQualifiedTopic))
408  {
409  std::cerr << "Topic [" << _topic << "] is not valid." << std::endl;
410  return false;
411  }
412 
413  bool localResponserFound;
414  IRepHandlerPtr repHandler;
415  {
416  std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);
417  localResponserFound = this->Shared()->repliers.FirstHandler(
418  fullyQualifiedTopic, T1().GetTypeName(), T2().GetTypeName(),
419  repHandler);
420  }
421 
422  // If the responser is within my process.
423  if (localResponserFound)
424  {
425  // There is a responser in my process, let's use it.
426  T2 rep;
427  bool result;
428  repHandler->RunLocalCallback(_req, rep, result);
429 
430  _cb(rep, result);
431  return true;
432  }
433 
434  // Create a new request handler.
435  std::shared_ptr<ReqHandler<T1, T2>> reqHandlerPtr(
436  new ReqHandler<T1, T2>(this->NodeUuid()));
437 
438  // Insert the request's parameters.
439  reqHandlerPtr->SetMessage(_req);
440 
441  // Insert the callback into the handler.
442  reqHandlerPtr->SetCallback(_cb);
443 
444  {
445  std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);
446 
447  // Store the request handler.
448  this->Shared()->requests.AddHandler(
449  fullyQualifiedTopic, this->NodeUuid(), reqHandlerPtr);
450 
451  // If the responser's address is known, make the request.
452  SrvAddresses_M addresses;
453  if (this->Shared()->discovery->SrvPublishers(
454  fullyQualifiedTopic, addresses))
455  {
456  this->Shared()->SendPendingRemoteReqs(fullyQualifiedTopic,
457  T1().GetTypeName(), T2().GetTypeName());
458  }
459  else
460  {
461  // Discover the service responser.
462  if (!this->Shared()->discovery->DiscoverSrv(fullyQualifiedTopic))
463  {
464  std::cerr << "Node::Request(): Error discovering a service. "
465  << "Did you forget to start the discovery service?"
466  << std::endl;
467  return false;
468  }
469  }
470  }
471 
472  return true;
473  }
474 
486  public: template<typename C, typename T1, typename T2> bool Request(
487  const std::string &_topic,
488  const T1 &_req,
489  void(C::*_cb)(const T2 &_rep, const bool _result),
490  C *_obj)
491  {
492  std::function<void(const T2 &, const bool)> f =
493  [_cb, _obj](const T2 &_internalRep, const bool _internalResult)
494  {
495  auto cb = std::bind(_cb, _obj, std::placeholders::_1,
496  std::placeholders::_2);
497  cb(_internalRep, _internalResult);
498  };
499 
500  return this->Request<T1, T2>(_topic, _req, f);
501  }
502 
511  public: template<typename T1, typename T2> bool Request(
512  const std::string &_topic,
513  const T1 &_req,
514  const unsigned int &_timeout,
515  T2 &_rep,
516  bool &_result)
517  {
518  std::string fullyQualifiedTopic;
519  if (!TopicUtils::FullyQualifiedName(this->Options().Partition(),
520  this->Options().NameSpace(), _topic, fullyQualifiedTopic))
521  {
522  std::cerr << "Topic [" << _topic << "] is not valid." << std::endl;
523  return false;
524  }
525 
526  // Create a new request handler.
527  std::shared_ptr<ReqHandler<T1, T2>> reqHandlerPtr(
528  new ReqHandler<T1, T2>(this->NodeUuid()));
529 
530  // Insert the request's parameters.
531  reqHandlerPtr->SetMessage(_req);
532 
533  std::unique_lock<std::recursive_mutex> lk(this->Shared()->mutex);
534 
535  // If the responser is within my process.
536  IRepHandlerPtr repHandler;
537  if (this->Shared()->repliers.FirstHandler(fullyQualifiedTopic,
538  T1().GetTypeName(), T2().GetTypeName(), repHandler))
539  {
540  // There is a responser in my process, let's use it.
541  repHandler->RunLocalCallback(_req, _rep, _result);
542  return true;
543  }
544 
545  // Store the request handler.
546  this->Shared()->requests.AddHandler(
547  fullyQualifiedTopic, this->NodeUuid(), reqHandlerPtr);
548 
549  // If the responser's address is known, make the request.
550  SrvAddresses_M addresses;
551  if (this->Shared()->discovery->SrvPublishers(
552  fullyQualifiedTopic, addresses))
553  {
554  this->Shared()->SendPendingRemoteReqs(fullyQualifiedTopic,
555  T1().GetTypeName(), T2().GetTypeName());
556  }
557  else
558  {
559  // Discover the service responser.
560  if (!this->Shared()->discovery->DiscoverSrv(fullyQualifiedTopic))
561  {
562  std::cerr << "Node::Request(): Error discovering a service. "
563  << "Did you forget to start the discovery service?"
564  << std::endl;
565  return false;
566  }
567  }
568 
569  // Wait until the REP is available.
570  bool executed = reqHandlerPtr->WaitUntil(lk, _timeout);
571 
572  // The request was not executed.
573  if (!executed)
574  return false;
575 
576  // The request was executed but did not succeed.
577  if (!reqHandlerPtr->Result())
578  {
579  _result = false;
580  return true;
581  }
582 
583  // Parse the response.
584  if (!_rep.ParseFromString(reqHandlerPtr->Response()))
585  {
586  std::cerr << "Node::Request(): Error Parsing the response"
587  << std::endl;
588  _result = false;
589  return true;
590  }
591 
592  _result = true;
593  return true;
594  }
595 
599  public: bool UnadvertiseSrv(const std::string &_topic);
600 
607  public: void TopicList(std::vector<std::string> &_topics) const;
608 
613  public: bool TopicInfo(const std::string &_topic,
614  std::vector<MessagePublisher> &_publishers) const;
615 
622  public: void ServiceList(std::vector<std::string> &_services) const;
623 
628  public: bool ServiceInfo(const std::string &_service,
629  std::vector<ServicePublisher> &_publishers) const;
630 
633  private: const std::string &Partition() const;
634 
637  private: const std::string &NameSpace() const;
638 
642  private: NodeShared *Shared() const;
643 
646  private: const std::string &NodeUuid() const;
647 
650  private: std::unordered_set<std::string> &TopicsAdvertised() const;
651 
654  private: std::unordered_set<std::string> &TopicsSubscribed() const;
655 
658  private: std::unordered_set<std::string> &SrvsAdvertised() const;
659 
662  private: NodeOptions &Options() const;
663 
666  protected: std::unique_ptr<transport::NodePrivate> dataPtr;
667  };
668  }
669 }
670 #endif
bool Advertise(const std::string &_topic, const AdvertiseOptions &_options=AdvertiseOptions())
Advertise a new topic.
Definition: Node.hh:79
static bool FullyQualifiedName(const std::string &_partition, const std::string &_ns, const std::string &_topic, std::string &_name)
Get the full topic path given a namespace and a topic name.
#define IGNITION_VISIBLE
Use to represent "symbol visible" if supported.
Definition: Helpers.hh:56
std::map< std::string, std::vector< ServicePublisher > > SrvAddresses_M
Definition: TransportTypes.hh:58
bool Request(const std::string &_topic, const T1 &_req, void(*_cb)(const T2 &_rep, const bool _result))
Request a new service using a non-blocking call.
Definition: Node.hh:376
A class for customizing the behavior of the Node.
Definition: NodeOptions.hh:35
It creates a reply handler for the specific protobuf messages used.
Definition: ReqHandler.hh:175
bool Subscribe(const std::string &_topic, void(*_cb)(const T &_msg))
Subscribe to a topic registering a callback.
Definition: Node.hh:147
ignition/transport/AdvertiseOptions.hh
Definition: AdvertiseOptions.hh:50
bool Advertise(const std::string &_topic, std::function< void(const T1 &_req, T2 &_rep, bool &_result)> &_cb, const AdvertiseOptions &_options=AdvertiseOptions())
Advertise a new service.
Definition: Node.hh:280
Private data for the Node class.
Definition: NodeShared.hh:53
google::protobuf::Message ProtoMsg
Definition: TransportTypes.hh:62
std::shared_ptr< IRepHandler > IRepHandlerPtr
Definition: TransportTypes.hh:82
bool Request(const std::string &_topic, const T1 &_req, std::function< void(const T2 &_rep, const bool _result)> &_cb)
Request a new service using a non-blocking call.
Definition: Node.hh:400
bool Advertise(const std::string &_topic, void(C::*_cb)(const T1 &_req, T2 &_rep, bool &_result), C *_obj, const AdvertiseOptions &_options=AdvertiseOptions())
Advertise a new service.
Definition: Node.hh:343
bool Subscribe(const std::string &_topic, std::function< void(const T &_msg)> &_cb)
Subscribe to a topic registering a callback.
Definition: Node.hh:165
with the service response.
Definition: RepHandler.hh:102
std::unique_ptr< transport::NodePrivate > dataPtr
Definition: Node.hh:666
A class that allows a client to communicate with other peers.
Definition: Node.hh:64
ignition/transport/Publisher.hh
Definition: Publisher.hh:175
ignition/transport/Publisher.hh
Definition: Publisher.hh:264
bool Advertise(const std::string &_topic, void(*_cb)(const T1 &_req, T2 &_rep, bool &_result), const AdvertiseOptions &_options=AdvertiseOptions())
Advertise a new service.
Definition: Node.hh:254
It creates a subscription handler for a specific protobuf message.
Definition: SubscriptionHandler.hh:102
bool Request(const std::string &_topic, const T1 &_req, const unsigned int &_timeout, T2 &_rep, bool &_result)
Request a new service using a blocking call.
Definition: Node.hh:511
Definition: AdvertiseOptions.hh:25
bool Request(const std::string &_topic, const T1 &_req, void(C::*_cb)(const T2 &_rep, const bool _result), C *_obj)
Request a new service using a non-blocking call.
Definition: Node.hh:486
IGNITION_VISIBLE void waitForShutdown()
Block the current thread until a SIGINT or SIGTERM is received.
bool Subscribe(const std::string &_topic, void(C::*_cb)(const T &_msg), C *_obj)
Subscribe to a topic registering a callback.
Definition: Node.hh:216