XRootD
XrdClPostMaster.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //------------------------------------------------------------------------------
18 
19 #include "XrdCl/XrdClPostMaster.hh"
22 #include "XrdCl/XrdClMessage.hh"
23 #include "XrdCl/XrdClConstants.hh"
24 #include "XrdCl/XrdClDefaultEnv.hh"
25 #include "XrdCl/XrdClPoller.hh"
27 #include "XrdCl/XrdClJobManager.hh"
29 #include "XrdCl/XrdClChannel.hh"
30 #include "XrdCl/XrdClConstants.hh"
31 #include "XrdCl/XrdClLog.hh"
33 
34 #include "XrdSys/XrdSysPthread.hh"
35 
36 namespace XrdCl
37 {
38  struct ConnErrJob : public Job
39  {
41  std::function<void( const URL&, const XRootDStatus& )> handler) : url( url ),
42  status( status ),
43  handler( handler )
44  {
45  }
46 
47  void Run( void *arg )
48  {
49  handler( url, status );
50  delete this;
51  }
52 
55  std::function<void( const URL&, const XRootDStatus& )> handler;
56  };
57 
59  {
60  PostMasterImpl() : pPoller( 0 ), pInitialized( false ), pRunning( false )
61  {
62  Env *env = DefaultEnv::GetEnv();
63  int workerThreads = DefaultWorkerThreads;
64  env->GetInt( "WorkerThreads", workerThreads );
65 
66  pTaskManager = new TaskManager();
67  pJobManager = new JobManager(workerThreads);
68  }
69 
71  {
72  delete pPoller;
73  delete pTaskManager;
74  delete pJobManager;
75  }
76 
77  typedef std::map<std::string, Channel*> ChannelMap;
78 
84  bool pRunning;
86 
88  std::unique_ptr<Job> pOnConnJob;
89  std::function<void( const URL&, const XRootDStatus& )> pOnConnErrCB;
90 
92  };
93 
94  //----------------------------------------------------------------------------
95  // Constructor
96  //----------------------------------------------------------------------------
98  {
99  }
100 
101  //----------------------------------------------------------------------------
102  // Destructor
103  //----------------------------------------------------------------------------
105  {
106  }
107 
108  //----------------------------------------------------------------------------
109  // Initializer
110  //----------------------------------------------------------------------------
112  {
113  Env *env = DefaultEnv::GetEnv();
114  std::string pollerPref = DefaultPollerPreference;
115  env->GetString( "PollerPreference", pollerPref );
116 
117  pImpl->pPoller = PollerFactory::CreatePoller( pollerPref );
118 
119  if( !pImpl->pPoller )
120  return false;
121 
122  bool st = pImpl->pPoller->Initialize();
123 
124  if( !st )
125  {
126  delete pImpl->pPoller;
127  return false;
128  }
129 
130  pImpl->pJobManager->Initialize();
131  pImpl->pInitialized = true;
132  return true;
133  }
134 
135  //----------------------------------------------------------------------------
136  // Finalizer
137  //----------------------------------------------------------------------------
139  {
140  //--------------------------------------------------------------------------
141  // Clean up the channels
142  //--------------------------------------------------------------------------
143  if( !pImpl->pInitialized )
144  return true;
145 
146  pImpl->pInitialized = false;
147  pImpl->pJobManager->Finalize();
148  PostMasterImpl::ChannelMap::iterator it;
149 
150  for( it = pImpl->pChannelMap.begin(); it != pImpl->pChannelMap.end(); ++it )
151  delete it->second;
152 
153  pImpl->pChannelMap.clear();
154  return pImpl->pPoller->Finalize();
155  }
156 
157  //----------------------------------------------------------------------------
158  // Start the post master
159  //----------------------------------------------------------------------------
161  {
162  if( !pImpl->pInitialized )
163  return false;
164 
165  if( !pImpl->pPoller->Start() )
166  return false;
167 
168  if( !pImpl->pTaskManager->Start() )
169  {
170  pImpl->pPoller->Stop();
171  return false;
172  }
173 
174  if( !pImpl->pJobManager->Start() )
175  {
176  pImpl->pPoller->Stop();
177  pImpl->pTaskManager->Stop();
178  return false;
179  }
180 
181  pImpl->pRunning = true;
182  return true;
183  }
184 
185  //----------------------------------------------------------------------------
186  // Stop the postmaster
187  //----------------------------------------------------------------------------
189  {
190  if( !pImpl->pInitialized || !pImpl->pRunning )
191  return true;
192 
193  if( !pImpl->pJobManager->Stop() )
194  return false;
195  if( !pImpl->pPoller->Stop() )
196  return false;
197  if( !pImpl->pTaskManager->Stop() )
198  return false;
199  pImpl->pRunning = false;
200  return true;
201  }
202 
203  //----------------------------------------------------------------------------
204  // Reinitialize after fork
205  //----------------------------------------------------------------------------
207  {
208  return true;
209  }
210 
211  //----------------------------------------------------------------------------
212  // Send the message asynchronously
213  //----------------------------------------------------------------------------
215  Message *msg,
216  MsgHandler *handler,
217  bool stateful,
218  time_t expires )
219  {
220  XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
221  Channel *channel = GetChannel( url );
222 
223  if( !channel )
225 
226  return channel->Send( msg, handler, stateful, expires );
227  }
228 
230  Message *msg,
231  MsgHandler *inHandler )
232  {
234  VirtualRedirector *redirector = registry.Get( url );
235  if( !redirector )
236  return Status( stError, errInvalidOp );
237  return redirector->HandleRequest( msg, inHandler );
238  }
239 
240  //----------------------------------------------------------------------------
241  // Query the transport handler
242  //----------------------------------------------------------------------------
244  uint16_t query,
245  AnyObject &result )
246  {
247  XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
248  Channel *channel = 0;
249  {
250  XrdSysMutexHelper scopedLock2( pImpl->pChannelMapMutex );
251  PostMasterImpl::ChannelMap::iterator it =
252  pImpl->pChannelMap.find( url.GetChannelId() );
253  if( it == pImpl->pChannelMap.end() )
254  return Status( stError, errInvalidOp );
255  channel = it->second;
256  }
257 
258  if( !channel )
259  return Status( stError, errNotSupported );
260 
261  return channel->QueryTransport( query, result );
262  }
263 
264  //----------------------------------------------------------------------------
265  // Register channel event handler
266  //----------------------------------------------------------------------------
268  ChannelEventHandler *handler )
269  {
270  XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
271  Channel *channel = GetChannel( url );
272 
273  if( !channel )
274  return Status( stError, errNotSupported );
275 
276  channel->RegisterEventHandler( handler );
277  return Status();
278  }
279 
280  //----------------------------------------------------------------------------
281  // Remove a channel event handler
282  //----------------------------------------------------------------------------
284  ChannelEventHandler *handler )
285  {
286  XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
287  Channel *channel = GetChannel( url );
288 
289  if( !channel )
290  return Status( stError, errNotSupported );
291 
292  channel->RemoveEventHandler( handler );
293  return Status();
294  }
295 
296  //------------------------------------------------------------------------
297  // Get the task manager object user by the post master
298  //------------------------------------------------------------------------
300  {
301  return pImpl->pTaskManager;
302  }
303 
304  //------------------------------------------------------------------------
305  // Get the job manager object user by the post master
306  //------------------------------------------------------------------------
308  {
309  return pImpl->pJobManager;
310  }
311 
312  //------------------------------------------------------------------------
313  // Shut down a channel
314  //------------------------------------------------------------------------
316  {
317  XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock, false );
318  PostMasterImpl::ChannelMap::iterator it =
319  pImpl->pChannelMap.find( url.GetChannelId() );
320 
321  if( it == pImpl->pChannelMap.end() )
322  return Status( stError, errInvalidOp );
323 
324  it->second->ForceDisconnect();
325  delete it->second;
326  pImpl->pChannelMap.erase( it );
327 
328  return Status();
329  }
330 
332  {
333  XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock, false );
334  PostMasterImpl::ChannelMap::iterator it =
335  pImpl->pChannelMap.find( url.GetChannelId() );
336 
337  if( it == pImpl->pChannelMap.end() )
338  return Status( stError, errInvalidOp );
339 
340  it->second->ForceReconnect();
341  return Status();
342  }
343 
344  //------------------------------------------------------------------------
345  // Get the number of connected data streams
346  //------------------------------------------------------------------------
347  uint16_t PostMaster::NbConnectedStrm( const URL &url )
348  {
349  XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
350  Channel *channel = GetChannel( url );
351  if( !channel ) return 0;
352  return channel->NbConnectedStrm();
353  }
354 
355  //------------------------------------------------------------------------
357  //------------------------------------------------------------------------
359  std::shared_ptr<Job> onConnJob )
360  {
361  XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
362  Channel *channel = GetChannel( url );
363  if( !channel ) return;
364  channel->SetOnDataConnectHandler( onConnJob );
365  }
366 
367  //------------------------------------------------------------------------
369  //------------------------------------------------------------------------
370  void PostMaster::SetOnConnectHandler( std::unique_ptr<Job> onConnJob )
371  {
372  XrdSysMutexHelper lck( pImpl->pMtx );
373  pImpl->pOnConnJob = std::move( onConnJob );
374  }
375 
376  //------------------------------------------------------------------------
377  // Set the global connection error handler
378  //------------------------------------------------------------------------
379  void PostMaster::SetConnectionErrorHandler( std::function<void( const URL&, const XRootDStatus& )> handler )
380  {
381  XrdSysMutexHelper lck( pImpl->pMtx );
382  pImpl->pOnConnErrCB = std::move( handler );
383  }
384 
385  //------------------------------------------------------------------------
386  // Notify the global on-connect handler
387  //------------------------------------------------------------------------
389  {
390  XrdSysMutexHelper lck( pImpl->pMtx );
391  if( pImpl->pOnConnJob )
392  {
393  URL *ptr = new URL( url );
394  pImpl->pJobManager->QueueJob( pImpl->pOnConnJob.get(), ptr );
395  }
396  }
397 
398  //------------------------------------------------------------------------
399  // Notify the global error connection handler
400  //------------------------------------------------------------------------
401  void PostMaster::NotifyConnErrHandler( const URL &url, const XRootDStatus &status )
402  {
403  XrdSysMutexHelper lck( pImpl->pMtx );
404  if( pImpl->pOnConnErrCB )
405  {
406  ConnErrJob *job = new ConnErrJob( url, status, pImpl->pOnConnErrCB );
407  pImpl->pJobManager->QueueJob( job, nullptr );
408  }
409  }
410 
411  //----------------------------------------------------------------------------
413  //----------------------------------------------------------------------------
414  void PostMaster::CollapseRedirect( const URL &alias, const URL &url )
415  {
416  XrdSysMutexHelper scopedLock( pImpl->pChannelMapMutex );
417 
418  //--------------------------------------------------------------------------
419  // Get the passive channel
420  //--------------------------------------------------------------------------
421  PostMasterImpl::ChannelMap::iterator it =
422  pImpl->pChannelMap.find( alias.GetChannelId() );
423  Channel *passive = 0;
424  if( it != pImpl->pChannelMap.end() )
425  passive = it->second;
426  //--------------------------------------------------------------------------
427  // If the channel does not exist there's nothing to do
428  //--------------------------------------------------------------------------
429  else return;
430 
431  //--------------------------------------------------------------------------
432  // Check if this URL is eligible for collapsing
433  //--------------------------------------------------------------------------
434  if( !passive->CanCollapse( url ) ) return;
435 
436  //--------------------------------------------------------------------------
437  // Create the active channel
438  //--------------------------------------------------------------------------
440  TransportHandler *trHandler = trManager->GetHandler( url.GetProtocol() );
441 
442  if( !trHandler )
443  {
444  Log *log = DefaultEnv::GetLog();
445  log->Error( PostMasterMsg, "Unable to get transport handler for %s "
446  "protocol", url.GetProtocol().c_str() );
447  return;
448  }
449 
450  Log *log = DefaultEnv::GetLog();
451  log->Info( PostMasterMsg, "Label channel %s with alias %s.",
452  url.GetHostId().c_str(), alias.GetHostId().c_str() );
453 
454  Channel *active = new Channel( alias, pImpl->pPoller, trHandler,
455  pImpl->pTaskManager, pImpl->pJobManager, url );
456  pImpl->pChannelMap[alias.GetChannelId()] = active;
457 
458  //--------------------------------------------------------------------------
459  // The passive channel will be deallocated by TTL
460  //--------------------------------------------------------------------------
461  }
462 
463  //------------------------------------------------------------------------
464  // Decrement file object instance count bound to this channel
465  //------------------------------------------------------------------------
466  void PostMaster::DecFileInstCnt( const URL &url )
467  {
468  XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
469  Channel *channel = GetChannel( url );
470 
471  if( !channel ) return;
472 
473  return channel->DecFileInstCnt();
474  }
475 
476  //------------------------------------------------------------------------
477  //true if underlying threads are running, false otherwise
478  //------------------------------------------------------------------------
480  {
481  return pImpl->pRunning;
482  }
483 
484  //----------------------------------------------------------------------------
485  // Get the channel
486  //----------------------------------------------------------------------------
487  Channel *PostMaster::GetChannel( const URL &url )
488  {
489  XrdSysMutexHelper scopedLock( pImpl->pChannelMapMutex );
490  Channel *channel = 0;
491  PostMasterImpl::ChannelMap::iterator it = pImpl->pChannelMap.find( url.GetChannelId() );
492 
493  if( it == pImpl->pChannelMap.end() )
494  {
496  TransportHandler *trHandler = trManager->GetHandler( url.GetProtocol() );
497 
498  if( !trHandler )
499  {
500  Log *log = DefaultEnv::GetLog();
501  log->Error( PostMasterMsg, "Unable to get transport handler for %s "
502  "protocol", url.GetProtocol().c_str() );
503  return 0;
504  }
505 
506  channel = new Channel( url, pImpl->pPoller, trHandler, pImpl->pTaskManager,
507  pImpl->pJobManager );
508  pImpl->pChannelMap[url.GetChannelId()] = channel;
509  }
510  else
511  channel = it->second;
512  return channel;
513  }
514 }
A communication channel between the client and the server.
Definition: XrdClChannel.hh:49
uint16_t NbConnectedStrm()
Get the number of connected data streams.
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
void SetOnDataConnectHandler(std::shared_ptr< Job > &onConnJob)
Set the on-connect handler for data streams.
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
void DecFileInstCnt()
Decrement file object instance count bound to this channel.
XRootDStatus Send(Message *msg, MsgHandler *handler, bool stateful, time_t expires)
bool CanCollapse(const URL &url)
Status QueryTransport(uint16_t query, AnyObject &result)
static TransportManager * GetTransportManager()
Get transport manager.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool GetString(const std::string &key, std::string &value)
Definition: XrdClEnv.cc:31
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
A synchronized queue.
Interface for a job to be run by the job manager.
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition: XrdClLog.cc:265
The message representation used throughout the system.
Definition: XrdClMessage.hh:30
static Poller * CreatePoller(const std::string &preference)
Interface for socket pollers.
Definition: XrdClPoller.hh:87
void SetOnDataConnectHandler(const URL &url, std::shared_ptr< Job > onConnJob)
Set the on-connect handler for data streams.
void CollapseRedirect(const URL &oldurl, const URL &newURL)
Collapse channel URL - replace the URL of the channel.
bool Start()
Start the post master.
bool Finalize()
Finalizer.
XRootDStatus Send(const URL &url, Message *msg, MsgHandler *handler, bool stateful, time_t expires)
Status ForceReconnect(const URL &url)
Reconnect the channel.
bool Stop()
Stop the postmaster.
bool Reinitialize()
Reinitialize after fork.
TaskManager * GetTaskManager()
Get the task manager object user by the post master.
uint16_t NbConnectedStrm(const URL &url)
Get the number of connected data streams.
void SetOnConnectHandler(std::unique_ptr< Job > onConnJob)
Set the global connection error handler.
Status RemoveEventHandler(const URL &url, ChannelEventHandler *handler)
Remove a channel event handler.
virtual ~PostMaster()
Destructor.
PostMaster()
Constructor.
void SetConnectionErrorHandler(std::function< void(const URL &, const XRootDStatus &)> handler)
Set the global on-error on-connect handler for control streams.
Status ForceDisconnect(const URL &url)
Shut down a channel.
Status Redirect(const URL &url, Message *msg, MsgHandler *handler)
void NotifyConnErrHandler(const URL &url, const XRootDStatus &status)
Notify the global error connection handler.
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
Status RegisterEventHandler(const URL &url, ChannelEventHandler *handler)
Register channel event handler.
void NotifyConnectHandler(const URL &url)
Notify the global on-connect handler.
JobManager * GetJobManager()
Get the job manager object user by the post master.
bool Initialize()
Initializer.
void DecFileInstCnt(const URL &url)
Decrement file object instance count bound to this channel.
Singleton access to URL to virtual redirector mapping.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
VirtualRedirector * Get(const URL &url) const
Get a virtual redirector associated with the given URL.
Perform the handshake and the authentication for each physical stream.
Manage transport handler objects.
TransportHandler * GetHandler(const std::string &protocol)
Get a transport handler object for a given protocol.
URL representation.
Definition: XrdClURL.hh:31
std::string GetChannelId() const
Definition: XrdClURL.cc:494
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:94
const std::string & GetProtocol() const
Get the protocol.
Definition: XrdClURL.hh:113
An interface for metadata redirectors.
virtual XRootDStatus HandleRequest(const Message *msg, MsgHandler *handler)=0
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint64_t PostMasterMsg
const uint16_t errInvalidOp
Definition: XrdClStatus.hh:51
const char *const DefaultPollerPreference
const uint16_t errNotSupported
Definition: XrdClStatus.hh:62
const int DefaultWorkerThreads
void Run(void *arg)
The job logic.
XRootDStatus status
ConnErrJob(const URL &url, const XRootDStatus &status, std::function< void(const URL &, const XRootDStatus &)> handler)
std::function< void(const URL &, const XRootDStatus &)> handler
TaskManager * pTaskManager
std::map< std::string, Channel * > ChannelMap
std::unique_ptr< Job > pOnConnJob
XrdSysRWLock pDisconnectLock
XrdSysMutex pChannelMapMutex
std::function< void(const URL &, const XRootDStatus &)> pOnConnErrCB
Procedure execution status.
Definition: XrdClStatus.hh:115