Revision 8e276806b2452d5e4f5f47ffaddb9a3da685ebc4 authored by Axel Naumann on 06 November 2023, 14:23:09 UTC, committed by Axel Naumann on 06 November 2023, 16:07:43 UTC
1 parent f2b8d33
Raw File
XrdProofdNetMgr.cxx
// @(#)root/proofd:$Id$
// Author: G. Ganis  Jan 2008

/*************************************************************************
 * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers.               *
 * All rights reserved.                                                  *
 *                                                                       *
 * For the licensing terms see $ROOTSYS/LICENSE.                         *
 * For the list of contributors see $ROOTSYS/README/CREDITS.             *
 *************************************************************************/
#include "XrdProofdPlatform.h"

//////////////////////////////////////////////////////////////////////////
//                                                                      //
// XrdProofdNetMgr                                                      //
//                                                                      //
// Authors: G. Ganis, CERN, 2008                                        //
//                                                                      //
// Manages connections between PROOF server daemons                     //
//                                                                      //
//////////////////////////////////////////////////////////////////////////

#include "XrdProofdNetMgr.h"

#include "XrdProofdXrdVers.h"
#ifndef ROOT_XrdFour
#  include "XpdSysDNS.h"
#else
#  include "XrdNet/XrdNetAddr.hh"
#endif
#include "Xrd/XrdBuffer.hh"
#include "XrdClient/XrdClientConst.hh"
#include "XrdClient/XrdClientEnv.hh"
#include "XrdClient/XrdClientMessage.hh"
#include "XrdClient/XrdClientUrlInfo.hh"
#include "XrdOuc/XrdOucStream.hh"
#include "XrdSys/XrdSysPlatform.hh"

#include "XrdProofdClient.h"
#include "XrdProofdManager.h"
#include "XrdProofdProtocol.h"
#include "XrdProofdResponse.h"
#include "XrdProofWorker.h"

// Tracing utilities
#include "XrdProofdTrace.h"

#include <algorithm>
#include <limits>
#include <math.h>

////////////////////////////////////////////////////////////////////////////////
/// Send up a message from the server

int MessageSender(const char *msg, int len, void *arg)
{
   XrdProofdResponse *r = (XrdProofdResponse *) arg;
   if (r) {
      return r->Send(kXR_attn, kXPD_srvmsg, 2, (char *) msg, len);
   }
   return -1;
}

////////////////////////////////////////////////////////////////////////////////
/// Constructor

XrdProofdNetMgr::XrdProofdNetMgr(XrdProofdManager *mgr,
                                 XrdProtocol_Config *pi, XrdSysError *e)
   : XrdProofdConfig(pi->ConfigFN, e)
{
   fMgr = mgr;
   fResourceType = kRTNone;
   fPROOFcfg.fName = "";
   fPROOFcfg.fMtime = -1;
   fReloadPROOFcfg = 1;
   fDfltFallback = 0;
   fDfltWorkers.clear();
   fRegWorkers.clear();
   fWorkers.clear();
   fNodes.clear();
   fNumLocalWrks = XrdProofdAux::GetNumCPUs();
   fWorkerUsrCfg = 0;
   fRequestTO = 30;

   // Configuration directives
   RegisterDirectives();
}

////////////////////////////////////////////////////////////////////////////////
/// Register config directives

void XrdProofdNetMgr::RegisterDirectives()
{
   Register("adminreqto", new XrdProofdDirective("adminreqto", this, &DoDirectiveClass));
   Register("resource", new XrdProofdDirective("resource", this, &DoDirectiveClass));
   Register("worker", new XrdProofdDirective("worker", this, &DoDirectiveClass));
   Register("localwrks", new XrdProofdDirective("localwrks", (void *)&fNumLocalWrks, &DoDirectiveInt));
}

////////////////////////////////////////////////////////////////////////////////
/// Destructor

XrdProofdNetMgr::~XrdProofdNetMgr()
{
   // Cleanup the worker lists
   // (the nodes list points to the same object, no cleanup is needed)
   std::list<XrdProofWorker *>::iterator w = fRegWorkers.begin();
   while (w != fRegWorkers.end()) {
      delete *w;
      w = fRegWorkers.erase(w);
   }
   w = fDfltWorkers.begin();
   while (w != fDfltWorkers.end()) {
      delete *w;
      w = fDfltWorkers.erase(w);
   }
   fWorkers.clear();
}

////////////////////////////////////////////////////////////////////////////////
/// Run configuration and parse the entered config directives.
/// Return 0 on success, -1 on error

int XrdProofdNetMgr::Config(bool rcf)
{
   XPDLOC(NMGR, "NetMgr::Config")

   // Lock the method to protect the lists.
   XrdSysMutexHelper mhp(fMutex);

   // Cleanup the worker list
   std::list<XrdProofWorker *>::iterator w = fWorkers.begin();
   while (w != fWorkers.end()) {
      delete *w;
      w = fWorkers.erase(w);
   }
   // Create a default master line
   XrdOucString mm("master ", 128);
   mm += fMgr->Host();
   mm += " port=";
   mm += fMgr->Port();
   fWorkers.push_back(new XrdProofWorker(mm.c_str()));

   // Run first the configurator
   if (XrdProofdConfig::Config(rcf) != 0) {
      XPDERR("problems parsing file ");
      return -1;
   }

   XrdOucString msg;
   msg = (rcf) ? "re-configuring" : "configuring";
   TRACE(ALL, msg);

   if (fMgr->SrvType() != kXPD_Worker || fMgr->SrvType() == kXPD_AnyServer) {
      TRACE(ALL, "PROOF config file: " <<
            ((fPROOFcfg.fName.length() > 0) ? fPROOFcfg.fName.c_str() : "none"));
      if (fResourceType == kRTStatic) {
         // Initialize the list of workers if a static config has been required
         // Default file path, if none specified
         bool dodefault = 1;
         if (fPROOFcfg.fName.length() > 0) {
            // Load file content in memory
            if (ReadPROOFcfg() == 0) {
               TRACE(ALL, "PROOF config file will " <<
                     ((fReloadPROOFcfg) ? "" : "not ") << "be reloaded upon change");
               dodefault = 0;
            } else {
               if (!fDfltFallback) {
                  XPDERR("unable to find valid information in PROOF config file " <<
                         fPROOFcfg.fName);
                  fPROOFcfg.fMtime = -1;
                  return 0;
               } else {
                  TRACE(ALL, "file " << fPROOFcfg.fName << " cannot be parsed: use default configuration to start with");
               }
            }
         }
         if (dodefault) {
            // Use default
            CreateDefaultPROOFcfg();
         }
      } else if (fResourceType == kRTNone && fWorkers.size() <= 1) {
         // Nothing defined: use default
         CreateDefaultPROOFcfg();
      }

      // Find unique nodes
      FindUniqueNodes();
   }

   // For connection to the other xproofds we try only once
   XrdProofConn::SetRetryParam(1, 1);
   // Request Timeout
   EnvPutInt(NAME_REQUESTTIMEOUT, fRequestTO);

   // Notification
   XPDFORM(msg, "%d worker nodes defined at start-up", fWorkers.size() - 1);
   TRACE(ALL, msg);

   // Done
   return 0;
}

////////////////////////////////////////////////////////////////////////////////
/// Update the priorities of the active sessions.

int XrdProofdNetMgr::DoDirective(XrdProofdDirective *d,
                                 char *val, XrdOucStream *cfg, bool rcf)
{
   XPDLOC(NMGR, "NetMgr::DoDirective")

   if (!d)
      // undefined inputs
      return -1;

   if (d->fName == "resource") {
      return DoDirectiveResource(val, cfg, rcf);
   } else if (d->fName == "adminreqto") {
      return DoDirectiveAdminReqTO(val, cfg, rcf);
   } else if (d->fName == "worker") {
      return DoDirectiveWorker(val, cfg, rcf);
   }

   TRACE(XERR, "unknown directive: " << d->fName);

   return -1;
}

////////////////////////////////////////////////////////////////////////////////
/// Indices (this will be used twice).

void XrdProofdNetMgr::BalanceNodesOrder()
{
   list<XrdProofWorker *>::const_iterator iter, iter2;
   list<XrdProofWorker *>::iterator iter3; // Not const, less efficient.
   // Map to store information of the balancer.
   map<XrdProofWorker *, BalancerInfo> info;
   // Node with minimum number of workers distinct to 1.
   unsigned int min = UINT_MAX;
   // Total number of nodes and per iteration assignments.
   unsigned int total = 0, total_perit = 0;
   // Number of iterations to get every node filled.
   unsigned int total_added = 0;
   // Temporary list to store the balanced configuration
   list<XrdProofWorker *> tempNodes;
   // Flag for the search and destroy loop.
   bool deleted;

   // Fill the information store with the first data (number of nodes).
   for (iter = fNodes.begin(); iter != fNodes.end(); ++iter) {
      // The next code is not the same as this:
      //info[*iter].available = count(fWorkers.begin(), fWorkers.end(), *iter);
      // The previous piece of STL code only checks the pointer of the value
      // stored on the list, altough it is more efficient, it needs that repeated
      // nodes point to the same object. To allow hybrid configurations, we are
      // doing a 'manually' matching since statically configured nodes are
      // created in multiple ways.
      info[*iter].available = 0;
      for (iter2 = fWorkers.begin(); iter2 != fWorkers.end(); ++iter2) {
         if ((*iter)->Matches(*iter2)) {
            info[*iter].available++;
         }
      }
      info[*iter].added = 0;
      // Calculate the minimum greater than 1.
      if (info[*iter].available > 1 && info[*iter].available < min)
         min = info[*iter].available;
      // Calculate the totals.
      total += info[*iter].available;
   }

   // Now, calculate the number of workers to add in each iteration of the
   // round robin, scaling to the smaller number.
   for (iter = fNodes.begin(); iter != fNodes.end(); ++iter) {
      if (info[*iter].available > 1) {
         info[*iter].per_iteration = (unsigned int)floor((double)info[*iter].available / (double)min);
      } else {
         info[*iter].per_iteration = 1;
      }
      // Calculate the totals.
      total_perit += info[*iter].per_iteration;
   }

   // Since we are going to substitute the list, don't forget to recover the
   // default node at the fist time.
   tempNodes.push_back(fWorkers.front());

   // Finally, do the round robin assignment of nodes.
   // Stop when every node has its workers processed.
   while (total_added < total) {
      for (map<XrdProofWorker *, BalancerInfo>::iterator i = info.begin(); i != info.end(); ++i) {
         if (i->second.added < i->second.available) {
            // Be careful with the remainders (on prime number of nodes).
            unsigned int to_add = xrdmin(i->second.per_iteration,
                                        (i->second.available - i->second.added));
            // Then add the nodes.
            for (unsigned int j = 0; j < to_add; j++) {
               tempNodes.push_back(i->first);
            }
            i->second.added += to_add;
            total_added += to_add;
         }
      }
   }

   // Since we are mergin nodes in only one object, we must merge the current
   // sessions of the static nodes (that can be distinct objects that represent
   // the same node) and delete the orphaned objects. If, in the future, we can
   // assure that every worker has only one object in the list, this is not more
   // necessary. The things needed to change are the DoDirectiveWorker, it must
   // search for a node before inserting it, and in the repeat directive insert
   // the same node always. Also the default configuration methods (there are
   // two in this class) must be updated.
   iter3 = ++(fWorkers.begin());
   while (iter3 != fWorkers.end()) {
      deleted = false;
      // If the worker is not in the fWorkers list, we must process it. Note that
      // std::count() uses a plain comparison between values, in this case, we
      // are comparing pointers (numbers, at the end).
      if (count(++(tempNodes.begin()), tempNodes.end(), *iter3) == 0) {
         // Search for an object that matches with this in the temp list.
         for (iter2 = ++(tempNodes.begin()); iter2 != tempNodes.end(); ++iter2) {
            if ((*iter2)->Matches(*iter3)) {
               // Copy data and delete the *iter object.
               (*iter2)->MergeProofServs(*(*iter3));
               deleted = true;
               delete *iter3;
               fWorkers.erase(iter3++);
               break;
            }
         }
      }
      // Do not forget to increase the iterator.
      if (!deleted)
         ++iter3;
   }

   // Then, substitute the current fWorkers list with the balanced one.
   fWorkers = tempNodes;
}

////////////////////////////////////////////////////////////////////////////////
/// Process 'adminreqto' directive

int XrdProofdNetMgr::DoDirectiveAdminReqTO(char *val, XrdOucStream *cfg, bool)
{
   if (!val)
      // undefined inputs
      return -1;

   // Check deprecated 'if' directive
   if (fMgr->Host() && cfg)
      if (XrdProofdAux::CheckIf(cfg, fMgr->Host()) == 0)
         return 0;

   // Timeout on requested broadcasted to workers; there are 4 attempts,
   // so the real timeout is 4 x fRequestTO
   int to = strtol(val, 0, 10);
   fRequestTO = (to > 0) ? to : fRequestTO;
   return 0;
}

////////////////////////////////////////////////////////////////////////////////
/// Process 'resource' directive

int XrdProofdNetMgr::DoDirectiveResource(char *val, XrdOucStream *cfg, bool)
{
   XPDLOC(NMGR, "NetMgr::DoDirectiveResource")

   if (!val || !cfg)
      // undefined inputs
      return -1;

   if (!strcmp("static", val)) {
      // We just take the path of the config file here; the
      // rest is used by the static scheduler
      fResourceType = kRTStatic;
      while ((val = cfg->GetWord()) && val[0]) {
         XrdOucString s(val);
         if (s.beginswith("ucfg:")) {
            fWorkerUsrCfg = s.endswith("yes") ? 1 : 0;
         } else if (s.beginswith("reload:")) {
            fReloadPROOFcfg = (s.endswith("1") || s.endswith("yes")) ? 1 : 0;
         } else if (s.beginswith("dfltfallback:")) {
            fDfltFallback = (s.endswith("1") || s.endswith("yes")) ? 1 : 0;
         } else if (s.beginswith("wmx:")) {
         } else if (s.beginswith("selopt:")) {
         } else {
            // Config file
            fPROOFcfg.fName = val;
            if (fPROOFcfg.fName.beginswith("sm:")) {
               fPROOFcfg.fName.replace("sm:", "");
            }
            XrdProofdAux::Expand(fPROOFcfg.fName);
            // Make sure it exists and can be read
            if (access(fPROOFcfg.fName.c_str(), R_OK)) {
               if (errno == ENOENT) {
                  TRACE(ALL, "WARNING: configuration file does not exists: " << fPROOFcfg.fName);
               } else {
                  TRACE(XERR, "configuration file cannot be read: " << fPROOFcfg.fName);
                  fPROOFcfg.fName = "";
                  fPROOFcfg.fMtime = -1;
               }
            }
         }
      }
   }
   return 0;
}

////////////////////////////////////////////////////////////////////////////////
/// Process 'worker' directive

int XrdProofdNetMgr::DoDirectiveWorker(char *val, XrdOucStream *cfg, bool)
{
   XPDLOC(NMGR, "NetMgr::DoDirectiveWorker")

   if (!val || !cfg)
      // undefined inputs
      return -1;

   // Lock the method to protect the lists.
   XrdSysMutexHelper mhp(fMutex);

   // Get the full line (w/o heading keyword)
   cfg->RetToken();
   XrdOucString wrd(cfg->GetWord());
   if (wrd.length() > 0) {
      // Build the line
      XrdOucString line;
      char rest[2048] = {0};
      cfg->GetRest((char *)&rest[0], 2048);
      XPDFORM(line, "%s %s", wrd.c_str(), rest);
      // Parse it now
      if (wrd == "master" || wrd == "node") {
         // Init a master instance
         XrdProofWorker *pw = new XrdProofWorker(line.c_str());
         if (pw->fHost.beginswith("localhost") ||
             pw->Matches(fMgr->Host())) {
            // Replace the default line (the first with what found in the file)
            XrdProofWorker *fw = fWorkers.front();
            fw->Reset(line.c_str());
         }
         SafeDelete(pw);
      } else {
         // How many lines like this?
         int nr = 1;
         int ir = line.find("repeat=");
         if (ir != STR_NPOS) {
            XrdOucString r(line, ir + strlen("repeat="));
            r.erase(r.find(' '));
            nr = r.atoi();
            if (nr < 0 || !XPD_LONGOK(nr)) nr = 1;
            TRACE(DBG, "found repeat = " << nr);
         }
         while (nr--) {
            // Build the worker object
            XrdProofdMultiStr mline(line.c_str());
            if (mline.IsValid()) {
               TRACE(DBG, "found multi-line with: " << mline.N() << " tokens");
               for (int i = 0; i < mline.N(); i++) {
                  TRACE(HDBG, "found token: " << mline.Get(i));
                  fWorkers.push_back(new XrdProofWorker(mline.Get(i).c_str()));
               }
            } else {
               TRACE(DBG, "found line: " << line);
               fWorkers.push_back(new XrdProofWorker(line.c_str()));
            }
         }
      }
   }

   // Necessary for the balancer when Bonjour is enabled. Note that this balancer
   // can also be enabled with a static configuration. By this time is disabled
   // due to its experimental status.
   FindUniqueNodes();
   //BalanceNodesOrder();

   return 0;
}

////////////////////////////////////////////////////////////////////////////////
/// Broadcast a ctrlc interrupt
/// Return 0 on success, -1 on error

int XrdProofdNetMgr::BroadcastCtrlC(const char *usr)
{
   XPDLOC(NMGR, "NetMgr::BroadcastCtrlC")

   int rc = 0;

   // Loop over unique nodes
   std::list<XrdProofWorker *>::iterator iw = fNodes.begin();
   XrdProofWorker *w = 0;
   while (iw != fNodes.end()) {
      if ((w = *iw) && w->fType != 'M') {
         // Do not send it to ourselves
         bool us = (((w->fHost.find("localhost") != STR_NPOS ||
                      XrdOucString(fMgr->Host()).find(w->fHost.c_str()) != STR_NPOS)) &&
                    (w->fPort == -1 || w->fPort == fMgr->Port())) ? 1 : 0;
         if (!us) {
            // Create 'url'
            // We use the enforced username if specified in the config file; this is the case
            // of user-dedicated daemons with mapped usernames, like PoD@gLite ...
            XrdOucString u = (w->fUser.length() > 0) ? w->fUser : usr;
            if (u.length() <= 0) u = fMgr->EffectiveUser();
            u += '@';
            u += w->fHost;
            if (w->fPort != -1) {
               u += ':';
               u += w->fPort;
            }
            TRACE(HDBG, "sending request to: "<<u);
            // Get a connection to the server
            XrdProofConn *conn = GetProofConn(u.c_str());
            if (conn && conn->IsValid()) {
               // Prepare request
               XPClientRequest reqhdr;
               memset(&reqhdr, 0, sizeof(reqhdr));
               conn->SetSID(reqhdr.header.streamid);
               reqhdr.proof.requestid = kXP_ctrlc;
               reqhdr.proof.sid = 0;
               reqhdr.proof.dlen = 0;
               // We need the right order
               if (XPD::clientMarshall(&reqhdr) != 0) {
                  TRACE(XERR, "problems marshalling request");
                  return -1;
               }
               if (conn->LowWrite(&reqhdr, 0, 0) != kOK) {
                  TRACE(XERR, "problems sending ctrl-c request to server " << u);
               }
               // Clean it up, to avoid leaving open tcp connection possibly going forever
               // into CLOSE_WAIT
               SafeDelete(conn);
            }
         } else {
            TRACE(DBG, "broadcast request for ourselves: ignore");
         }
      }
      // Next worker
      ++iw;
   }

   // Done
   return rc;
}

////////////////////////////////////////////////////////////////////////////////
/// Broadcast request to known potential sub-nodes.
/// Return 0 on success, -1 on error

int XrdProofdNetMgr::Broadcast(int type, const char *msg, const char *usr,
                               XrdProofdResponse *r, bool notify, int subtype)
{
   XPDLOC(NMGR, "NetMgr::Broadcast")

   unsigned int nok = 0;
   TRACE(REQ, "type: " << type);

   // Loop over unique nodes
   std::list<XrdProofWorker *>::iterator iw = fNodes.begin();
   XrdProofWorker *w = 0;
   XrdClientMessage *xrsp = 0;
   while (iw != fNodes.end()) {
      if ((w = *iw) && w->fType != 'M') {
         // Do not send it to ourselves
         bool us = (((w->fHost.find("localhost") != STR_NPOS ||
                      XrdOucString(fMgr->Host()).find(w->fHost.c_str()) != STR_NPOS)) &&
                    (w->fPort == -1 || w->fPort == fMgr->Port())) ? 1 : 0;
         if (!us) {
            // Create 'url'
            // We use the enforced username if specified in the config file; this is the case
            // of user-dedicated daemons with mapped usernames, like PoD@gLite ...
            XrdOucString u = (w->fUser.length() > 0) ? w->fUser : usr;
            if (u.length() <= 0) u = fMgr->EffectiveUser();
            u += '@';
            u += w->fHost;
            if (w->fPort != -1) {
               u += ':';
               u += w->fPort;
            }
            // Type of server
            int srvtype = (w->fType != 'W') ? (kXR_int32) kXPD_Master
                          : (kXR_int32) kXPD_Worker;
            TRACE(HDBG, "sending request to " << u);
            // Send request
            if (!(xrsp = Send(u.c_str(), type, msg, srvtype, r, notify, subtype))) {
               TRACE(XERR, "problems sending request to " << u);
            } else {
               nok++;
            }
            // Cleanup answer
            SafeDelete(xrsp);
         } else {
            TRACE(DBG, "broadcast request for ourselves: ignore");
         }
      }
      // Next worker
      ++iw;
   }

   // Done
   return (nok == fNodes.size()) ? 0 : -1;
}

////////////////////////////////////////////////////////////////////////////////
/// Get a XrdProofConn for url; create a new one if not available

XrdProofConn *XrdProofdNetMgr::GetProofConn(const char *url)
{
   XrdProofConn *p = 0;

   // If not found create a new one
   XrdOucString buf = " Manager connection from ";
   buf += fMgr->Host();
   buf += "|ord:000";
   char m = 'A'; // log as admin

   {
      XrdSysMutexHelper mhp(fMutex);
      p = new XrdProofConn(url, m, -1, -1, 0, buf.c_str());
   }
   if (p && !(p->IsValid())) SafeDelete(p);

   // Done
   return p;
}

////////////////////////////////////////////////////////////////////////////////
/// Broadcast request to known potential sub-nodes.
/// Return 0 on success, -1 on error

XrdClientMessage *XrdProofdNetMgr::Send(const char *url, int type,
                                        const char *msg, int srvtype,
                                        XrdProofdResponse *r, bool notify,
                                        int subtype)
{
   XPDLOC(NMGR, "NetMgr::Send")

   XrdClientMessage *xrsp = 0;
   TRACE(REQ, "type: " << type);

   if (!url || strlen(url) <= 0)
      return xrsp;

   // Get a connection to the server
   XrdProofConn *conn = GetProofConn(url);

   bool ok = 1;
   if (conn && conn->IsValid()) {
      XrdOucString notifymsg("Send: ");
      // Prepare request
      XPClientRequest reqhdr;
      const void *buf = 0;
      char **vout = 0;
      memset(&reqhdr, 0, sizeof(reqhdr));
      conn->SetSID(reqhdr.header.streamid);
      reqhdr.header.requestid = kXP_admin;
      reqhdr.proof.int1 = type;
      switch (type) {
         case kROOTVersion:
            notifymsg += "change-of-ROOT version request to ";
            notifymsg += url;
            notifymsg += " msg: ";
            notifymsg += msg;
            reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
            buf = (msg) ? (const void *)msg : buf;
            break;
         case kCleanupSessions:
            notifymsg += "cleanup request to ";
            notifymsg += url;
            notifymsg += " for user: ";
            notifymsg += msg;
            reqhdr.proof.int2 = (kXR_int32) srvtype;
            reqhdr.proof.sid = -1;
            reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
            buf = (msg) ? (const void *)msg : buf;
            break;
         case kExec:
            notifymsg += "exec ";
            notifymsg += subtype;
            notifymsg += "request for ";
            notifymsg += msg;
            reqhdr.proof.int2 = (kXR_int32) subtype;
            reqhdr.proof.sid = -1;
            reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
            buf = (msg) ? (const void *)msg : buf;
            break;
         default:
            ok = 0;
            TRACE(XERR, "invalid request type " << type);
            break;
      }

      // Notify the client that we are sending the request
      if (r && notify)
         r->Send(kXR_attn, kXPD_srvmsg, 0, (char *) notifymsg.c_str(), notifymsg.length());

      // Activate processing of unsolicited responses
      conn->SetAsync(conn, &MessageSender, (void *)r);

      // Send over
      if (ok)
         xrsp = conn->SendReq(&reqhdr, buf, vout, "NetMgr::Send");

      // Deactivate processing of unsolicited responses
      conn->SetAsync(0, 0, (void *)0);

      // Print error msg, if any
      if (r && !xrsp && conn->GetLastErr()) {
         XrdOucString cmsg = url;
         cmsg += ": ";
         cmsg += conn->GetLastErr();
         r->Send(kXR_attn, kXPD_srvmsg, (char *) cmsg.c_str(), cmsg.length());
      }
      // Clean it up, to avoid leaving open tcp connection possibly going forever
      // into CLOSE_WAIT
      SafeDelete(conn);

   } else {
      TRACE(XERR, "could not open connection to " << url);
      if (r) {
         XrdOucString cmsg = "failure attempting connection to ";
         cmsg += url;
         r->Send(kXR_attn, kXPD_srvmsg, (char *) cmsg.c_str(), cmsg.length());
      }
   }

   // Done
   return xrsp;
}

////////////////////////////////////////////////////////////////////////////////
/// Check if 'host' is this local host. If checkport is true,
/// matching of the local port with the one implied by host is also checked.
/// Return 1 if 'local', 0 otherwise

bool XrdProofdNetMgr::IsLocal(const char *host, bool checkport)
{
   XPDLOC(NMGR, "NetMgr::IsLocal")

   int rc = 0;
   if (host && strlen(host) > 0) {
      XrdClientUrlInfo uu(host);
      if (uu.Port <= 0) uu.Port = 1093;
      // Fully qualified name
#ifndef ROOT_XrdFour
      char *fqn = XrdSysDNS::getHostName(uu.Host.c_str());
#else
      XrdNetAddr aNA;
      aNA.Set(uu.Host.c_str());
      char *fqn = (char *) aNA.Name();
#endif
      TRACE(HDBG, "fqn: '"<<fqn<<"' mgrh: '"<<fMgr->Host()<<"'");
      if (fqn && (strstr(fqn, "localhost") || !strcmp(fqn, "127.0.0.1") ||
                  !strcmp(fMgr->Host(), fqn))) {
         if (!checkport || (uu.Port == fMgr->Port()))
            rc = 1;
      }
#ifndef ROOT_XrdFour
      SafeFree(fqn);
#endif
   }
   // Done
   return rc;
}

////////////////////////////////////////////////////////////////////////////////
/// Process a readbuf request

int XrdProofdNetMgr::ReadBuffer(XrdProofdProtocol *p)
{
   XPDLOC(NMGR, "NetMgr::ReadBuffer")

   int rc = 0;
   XPD_SETRESP(p, "ReadBuffer");

   XrdOucString emsg;

   // Unmarshall the data
   //
   kXR_int64 ofs = ntohll(p->Request()->readbuf.ofs);
   int len = ntohl(p->Request()->readbuf.len);

   // Find out the file name
   char *file = 0;
   char *filen = 0;
   char *pattern = 0;
   int dlen = p->Request()->header.dlen;
   int grep = ntohl(p->Request()->readbuf.int1);
   int blen = dlen;
   bool local = 0;
   if (dlen > 0 && p->Argp()->buff) {
      file = new char[dlen+1];
      memcpy(file, p->Argp()->buff, dlen);
      file[dlen] = 0;
      // Check if local
      XrdClientUrlInfo ui(file);
      if (ui.Host.length() > 0) {
         // Check locality
         local = XrdProofdNetMgr::IsLocal(ui.Host.c_str());
         if (local) {
            memcpy(file, ui.File.c_str(), ui.File.length());
            file[ui.File.length()] = 0;
            blen = ui.File.length();
            TRACEP(p, DBG, "file is LOCAL");
         }
      }
      // If grep, extract the pattern
      if (grep > 0) {
         // 'grep' operation: len is the length of the 'pattern' to be grepped
         pattern = new char[len + 1];
         int j = blen - len;
         int i = 0;
         while (j < blen)
            pattern[i++] = file[j++];
         pattern[i] = 0;
         filen = strdup(file);
         filen[blen - len] = 0;
         TRACEP(p, DBG, "grep operation " << grep << ", pattern:" << pattern);
      }
   } else {
      emsg = "file name not found";
      TRACEP(p, XERR, emsg);
      response->Send(kXR_InvalidRequest, emsg.c_str());
      return 0;
   }
   if (grep) {
      TRACEP(p, REQ, "file: " << filen << ", ofs: " << ofs << ", len: " << len <<
             ", pattern: " << pattern);
   } else {
      TRACEP(p, REQ, "file: " << file << ", ofs: " << ofs << ", len: " << len);
   }

   // Get the buffer
   int lout = len;
   char *buf = 0;
   if (local) {
      if (grep > 0) {
         // Grep local file
         lout = blen; // initial length
         buf = ReadBufferLocal(filen, pattern, lout, grep);
      } else {
         // Read portion of local file
         buf = ReadBufferLocal(file, ofs, lout);
      }
   } else {
      // Read portion of remote file
      XrdClientUrlInfo u(file);
      if (u.User.length() <= 0)
         u.User = p->Client()->User() ? p->Client()->User() : fMgr->EffectiveUser();
      buf = ReadBufferRemote(u.GetUrl().c_str(), file, ofs, lout, grep);
   }

   bool sent = 0;
   if (!buf) {
      if (lout > 0) {
         if (grep > 0) {
            if (TRACING(DBG)) {
               XPDFORM(emsg, "nothing found by 'grep' in %s, pattern: %s", filen, pattern);
               TRACEP(p, DBG, emsg);
            }
            response->Send();
            sent = 1;
         } else {
            XPDFORM(emsg, "could not read buffer from %s %s",
                    (local) ? "local file " : "remote file ", file);
            TRACEP(p, XERR, emsg);
            response->Send(kXR_InvalidRequest, emsg.c_str());
            sent = 1;
         }
      } else {
         // Just got an empty buffer
         if (TRACING(DBG)) {
            emsg = "nothing found in ";
            emsg += (grep > 0) ? filen : file;
            TRACEP(p, DBG, emsg);
         }
      }
   }

   // Send back to user
   if (!sent)
      response->Send(buf, lout);

   // Cleanup
   SafeFree(buf);
   SafeDelArray(file);
   SafeFree(filen);
   SafeDelArray(pattern);

   // Done
   return 0;
}

////////////////////////////////////////////////////////////////////////////////
/// Locate the exact file path allowing for wildcards '*' in the file name.
/// In case of success, returns 0 and fills file wity the first matching instance.
/// Return -1 if no matching pat is found.

int XrdProofdNetMgr::LocateLocalFile(XrdOucString &file)
{
   XPDLOC(NMGR, "NetMgr::LocateLocalFile")

   // If no wild cards or empty, nothing to do
   if (file.length() <= 0 || file.find('*') == STR_NPOS) return 0;

   // Locate the file name and the dir
   XrdOucString fn, dn;
   int isl = file.rfind('/');
   if (isl != STR_NPOS) {
      fn.assign(file, isl + 1, -1);
      dn.assign(file, 0, isl);
   } else {
      fn = file;
      dn = "./";
   }

   XrdOucString emsg;
   // Scan the dir
   DIR *dirp = opendir(dn.c_str());
   if (!dirp) {
      XPDFORM(emsg, "cannot open '%s' - errno: %d", dn.c_str(), errno);
      TRACE(XERR, emsg.c_str());
      return -1;
   }
   struct dirent *ent = 0;
   XrdOucString sent;
   while ((ent = readdir(dirp))) {
      if (!strncmp(ent->d_name, ".", 1) || !strncmp(ent->d_name, "..", 2))
         continue;
      // Check the match
      sent = ent->d_name;
      if (sent.matches(fn.c_str()) > 0) break;
      sent = "";
   }
   closedir(dirp);

   // If found fill a new output
   if (sent.length() > 0) {
      XPDFORM(file, "%s%s", dn.c_str(), sent.c_str());
      return 0;
   }

   // Not found
   return -1;
}

////////////////////////////////////////////////////////////////////////////////
/// Read a buffer of length 'len' at offset 'ofs' of local file 'path'; the
/// returned buffer must be freed by the caller.
/// Wild cards '*' are allowed in the file name of 'path'; the first matching
/// instance is taken.
/// Returns 0 in case of error.

char *XrdProofdNetMgr::ReadBufferLocal(const char *path, kXR_int64 ofs, int &len)
{
   XPDLOC(NMGR, "NetMgr::ReadBufferLocal")

   XrdOucString emsg;
   TRACE(REQ, "file: " << path << ", ofs: " << ofs << ", len: " << len);

   // Check input
   if (!path || strlen(path) <= 0) {
      TRACE(XERR, "path undefined!");
      return (char *)0;
   }

   // Locate the path resolving wild cards
   XrdOucString spath(path);
   if (LocateLocalFile(spath) != 0) {
      TRACE(XERR, "path cannot be resolved! (" << path << ")");
      return (char *)0;
   }
   const char *file = spath.c_str();

   // Open the file in read mode
   int fd = open(file, O_RDONLY);
   if (fd < 0) {
      emsg = "could not open ";
      emsg += file;
      TRACE(XERR, emsg);
      return (char *)0;
   }

   // Size of the output
   struct stat st;
   if (fstat(fd, &st) != 0) {
      emsg = "could not get size of file with stat: errno: ";
      emsg += (int)errno;
      TRACE(XERR, emsg);
      close(fd);
      return (char *)0;
   }
   off_t ltot = st.st_size;

   // Estimate offsets of the requested range
   // Start from ...
   kXR_int64 start = ofs;
   off_t fst = (start < 0) ? ltot + start : start;
   fst = (fst < 0) ? 0 : ((fst >= ltot) ? ltot - 1 : fst);
   // End at ...
   kXR_int64 end = fst + len;
   off_t lst = (end >= ltot) ? ltot : ((end > fst) ? end  : ltot);
   TRACE(DBG, "file size: " << ltot << ", read from: " << fst << " to " << lst);

   // Number of bytes to be read
   len = lst - fst;

   // Output buffer
   char *buf = (char *)malloc(len + 1);
   if (!buf) {
      emsg = "could not allocate enough memory on the heap: errno: ";
      emsg += (int)errno;
      XPDERR(emsg);
      close(fd);
      return (char *)0;
   }

   // Reposition, if needed
   if (fst >= 0)
      lseek(fd, fst, SEEK_SET);

   int left = len;
   int pos = 0;
   int nr = 0;
   do {
      while ((nr = read(fd, buf + pos, left)) < 0 && errno == EINTR)
         errno = 0;
      if (nr < 0) {
         TRACE(XERR, "error reading from file: errno: " << errno);
         break;
      }

      // Update counters
      pos += nr;
      left -= nr;

   } while (nr > 0 && left > 0);

   // Termination
   buf[len] = 0;
   TRACE(HDBG, "read " << nr << " bytes: " << buf);

   // Close file
   close(fd);

   // Done
   return buf;
}

////////////////////////////////////////////////////////////////////////////////
/// Grep lines matching 'pat' form 'path'; the returned buffer (length in 'len')
/// must be freed by the caller.
/// Wild cards '*' are allowed in the file name of 'path'; the first matching
/// instance is taken.
/// Returns 0 in case of error.

char *XrdProofdNetMgr::ReadBufferLocal(const char *path,
                                       const char *pat, int &len, int opt)
{
   XPDLOC(NMGR, "NetMgr::ReadBufferLocal")

   XrdOucString emsg;
   TRACE(REQ, "file: " << path << ", pat: " << pat << ", len: " << len);

   // Check input
   if (!path || strlen(path) <= 0) {
      TRACE(XERR, "file path undefined!");
      return (char *)0;
   }

   // Locate the path resolving wild cards
   XrdOucString spath(path);
   if (LocateLocalFile(spath) != 0) {
      TRACE(XERR, "path cannot be resolved! (" << path << ")");
      return (char *)0;
   }
   const char *file = spath.c_str();

   // Size of the output
   struct stat st;
   if (stat(file, &st) != 0) {
      emsg = "could not get size of file with stat: errno: ";
      emsg += (int)errno;
      TRACE(XERR, emsg);
      return (char *)0;
   }
   off_t ltot = st.st_size;

   // The grep command
   char *cmd = 0;
   int lcmd = 0;
   if (pat && strlen(pat) > 0) {
      lcmd = strlen(pat) + strlen(file) + 20;
      cmd = new char[lcmd];
      if (opt == 1) {
         snprintf(cmd, lcmd, "grep %s %s", pat, file);
      } else if (opt == 2) {
         snprintf(cmd, lcmd, "grep -v %s %s", pat, file);
      } else if (opt == 3) {
         snprintf(cmd, lcmd, "cat %s | %s", file, pat);
      } else { // should not be here
         snprintf(cmd, lcmd, "cat %s", file);
      }
   } else {
      lcmd = strlen(file) + 10;
      cmd = new char[lcmd];
      snprintf(cmd, lcmd, "cat %s", file);
   }
   TRACE(DBG, "cmd: " << cmd);

   // Execute the command in a pipe
   FILE *fp = popen(cmd, "r");
   if (!fp) {
      emsg = "could not run '";
      emsg += cmd;
      emsg += "'";
      TRACE(XERR, emsg);
      delete[] cmd;
      return (char *)0;
   }
   delete[] cmd;

   // Read line by line
   len = 0;
   char *buf = 0;
   char line[2048];
   int bufsiz = 0, left = 0, lines = 0;
   while ((ltot > 0) && fgets(line, sizeof(line), fp)) {
      // Parse the line
      int llen = strlen(line);
      ltot -= llen;
      lines++;
      // (Re-)allocate the buffer
      if (!buf || (llen > left)) {
         int dsiz = 100 * ((int)((len + llen) / lines) + 1);
         dsiz = (dsiz > llen) ? dsiz : llen;
         bufsiz += dsiz;
         buf = (char *)realloc(buf, bufsiz + 1);
         left += dsiz;
      }
      if (!buf) {
         emsg = "could not allocate enough memory on the heap: errno: ";
         emsg += (int)errno;
         TRACE(XERR, emsg);
         pclose(fp);
         return (char *)0;
      }
      // Add line to the buffer
      memcpy(buf + len, line, llen);
      len += llen;
      left -= llen;
      if (TRACING(HDBG))
         fprintf(stderr, "line: %s", line);
   }

   // Check the result and terminate the buffer
   if (buf) {
      if (len > 0) {
         buf[len] = 0;
      } else {
         free(buf);
         buf = 0;
      }
   }

   // Close file
   pclose(fp);

   // Done
   return buf;
}

////////////////////////////////////////////////////////////////////////////////
/// Send a read buffer request of length 'len' at offset 'ofs' for remote file
/// defined by 'url'; the returned buffer must be freed by the caller.
/// Returns 0 in case of error.

char *XrdProofdNetMgr::ReadBufferRemote(const char *url, const char *file,
                                        kXR_int64 ofs, int &len, int grep)
{
   XPDLOC(NMGR, "NetMgr::ReadBufferRemote")

   TRACE(REQ, "url: " << (url ? url : "undef") <<
         ", file: " << (file ? file : "undef") << ", ofs: " << ofs <<
         ", len: " << len << ", grep: " << grep);

   // Check input
   if (!file || strlen(file) <= 0) {
      TRACE(XERR, "file undefined!");
      return (char *)0;
   }
   XrdClientUrlInfo u(url);
   if (!url || strlen(url) <= 0) {
      // Use file as url
      u.TakeUrl(XrdOucString(file));
      if (u.User.length() <= 0) u.User = fMgr->EffectiveUser();
   }

   // Get a connection (logs in)
   XrdProofConn *conn = GetProofConn(u.GetUrl().c_str());

   char *buf = 0;
   if (conn && conn->IsValid()) {
      // Prepare request
      XPClientRequest reqhdr;
      memset(&reqhdr, 0, sizeof(reqhdr));
      conn->SetSID(reqhdr.header.streamid);
      reqhdr.header.requestid = kXP_readbuf;
      reqhdr.readbuf.ofs = ofs;
      reqhdr.readbuf.len = len;
      reqhdr.readbuf.int1 = grep;
      reqhdr.header.dlen = strlen(file);
      const void *btmp = (const void *) file;
      char **vout = &buf;
      // Send over
      XrdClientMessage *xrsp =
         conn->SendReq(&reqhdr, btmp, vout, "NetMgr::ReadBufferRemote");

      // If positive answer
      if (xrsp && buf && (xrsp->DataLen() > 0)) {
         len = xrsp->DataLen();
      } else {
         if (xrsp && !(xrsp->IsError()))
            // The buffer was just empty: do not call it error
            len = 0;
         SafeFree(buf);
      }

      // Clean the message
      SafeDelete(xrsp);
      // Clean it up, to avoid leaving open tcp connection possibly going forever
      // into CLOSE_WAIT
      SafeDelete(conn);
   }

   // Done
   return buf;
}

////////////////////////////////////////////////////////////////////////////////
/// Get log paths from next tier; used in multi-master setups
/// Returns 0 in case of error.

char *XrdProofdNetMgr::ReadLogPaths(const char *url, const char *msg, int isess)
{
   XPDLOC(NMGR, "NetMgr::ReadLogPaths")

   TRACE(REQ, "url: " << (url ? url : "undef") <<
         ", msg: " << (msg ? msg : "undef") << ", isess: " << isess);

   // Check input
   if (!url || strlen(url) <= 0) {
      TRACE(XERR, "url undefined!");
      return (char *)0;
   }

   // Get a connection (logs in)
   XrdProofConn *conn = GetProofConn(url);

   char *buf = 0;
   if (conn && conn->IsValid()) {
      // Prepare request
      XPClientRequest reqhdr;
      memset(&reqhdr, 0, sizeof(reqhdr));
      conn->SetSID(reqhdr.header.streamid);
      reqhdr.header.requestid = kXP_admin;
      reqhdr.proof.int1 = kQueryLogPaths;
      reqhdr.proof.int2 = isess;
      reqhdr.proof.sid = -1;
      reqhdr.header.dlen = msg ? strlen(msg) : 0;
      const void *btmp = (const void *) msg;
      char **vout = &buf;
      // Send over
      XrdClientMessage *xrsp =
         conn->SendReq(&reqhdr, btmp, vout, "NetMgr::ReadLogPaths");

      // If positive answer
      if (xrsp && buf && (xrsp->DataLen() > 0)) {
         int len = xrsp->DataLen();
         buf = (char *) realloc((void *)buf, len + 1);
         if (buf)
            buf[len] = 0;
      } else {
         SafeFree(buf);
      }

      // Clean the message
      SafeDelete(xrsp);
      // Clean it up, to avoid leaving open tcp connection possibly going forever
      // into CLOSE_WAIT
      SafeDelete(conn);
   }

   // Done
   return buf;
}

////////////////////////////////////////////////////////////////////////////////
/// Get log paths from next tier; used in multi-master setups
/// Returns 0 in case of error.

char *XrdProofdNetMgr::ReadLogPaths(const char *msg, int isess)
{
   XPDLOC(NMGR, "NetMgr::ReadLogPaths")

   TRACE(REQ, "msg: " << (msg ? msg : "undef") << ", isess: " << isess);

   char *buf = 0, *pbuf = buf;
   int len = 0;
   // Loop over unique nodes
   std::list<XrdProofWorker *>::iterator iw = fNodes.begin();
   XrdProofWorker *w = 0;
   while (iw != fNodes.end()) {
      if ((w = *iw)) {
         // Do not send it to ourselves
         bool us = (((w->fHost.find("localhost") != STR_NPOS ||
                      XrdOucString(fMgr->Host()).find(w->fHost.c_str()) != STR_NPOS)) &&
                    (w->fPort == -1 || w->fPort == fMgr->Port())) ? 1 : 0;
         if (!us) {
            // Create 'url'
            XrdOucString u = fMgr->EffectiveUser();
            u += '@';
            u += w->fHost;
            if (w->fPort != -1) {
               u += ':';
               u += w->fPort;
            }
            // Ask the node
            char *bmst = fMgr->NetMgr()->ReadLogPaths(u.c_str(), msg, isess);
            if (bmst) {
               len += strlen(bmst) + 1;
               buf = (char *) realloc((void *)buf, len);
               pbuf = buf + len - strlen(bmst) - 1;
               memcpy(pbuf, bmst, strlen(bmst) + 1);
               buf[len - 1] = 0;
               pbuf = buf + len;
               free(bmst);
            }
         } else {
            TRACE(DBG, "request for ourselves: ignore");
         }
      }
      // Next worker
      ++iw;
   }

   // Done
   return buf;
}

////////////////////////////////////////////////////////////////////////////////
/// Fill-in fWorkers for a localhost based on the number of
/// workers fNumLocalWrks.

void XrdProofdNetMgr::CreateDefaultPROOFcfg()
{
   XPDLOC(NMGR, "NetMgr::CreateDefaultPROOFcfg")

   TRACE(DBG, "enter: local workers: " << fNumLocalWrks);

   // Lock the method to protect the lists.
   XrdSysMutexHelper mhp(fMutex);

   // Cleanup the worker list
   fWorkers.clear();
   // The first time we need to create the default workers
   if (fDfltWorkers.size() < 1) {
      // Create a default master line
      XrdOucString mm("master ", 128);
      mm += fMgr->Host();
      fDfltWorkers.push_back(new XrdProofWorker(mm.c_str()));

      // Create 'localhost' lines for each worker
      int nwrk = fNumLocalWrks;
      if (nwrk > 0) {
         mm = "worker localhost port=";
         mm += fMgr->Port();
         while (nwrk--) {
            fDfltWorkers.push_back(new XrdProofWorker(mm.c_str()));
            TRACE(DBG, "added line: " << mm);
         }
      }
   }

   // Copy the list
   std::list<XrdProofWorker *>::iterator w = fDfltWorkers.begin();
   for (; w != fDfltWorkers.end(); ++w) {
      fWorkers.push_back(*w);
   }

   TRACE(DBG, "done: " << fWorkers.size() - 1 << " workers");

   // Find unique nodes
   FindUniqueNodes();

   // We are done
   return;
}

////////////////////////////////////////////////////////////////////////////////
/// Return the list of workers after having made sure that the info is
/// up-to-date

std::list<XrdProofWorker *> *XrdProofdNetMgr::GetActiveWorkers()
{
   XPDLOC(NMGR, "NetMgr::GetActiveWorkers")

   XrdSysMutexHelper mhp(fMutex);

   if (fResourceType == kRTStatic && fPROOFcfg.fName.length() > 0) {
      // Check if there were any changes in the config file
      if (fReloadPROOFcfg && ReadPROOFcfg(1) != 0) {
         if (fDfltFallback) {
            // Use default settings
            CreateDefaultPROOFcfg();
            TRACE(DBG, "parsing of " << fPROOFcfg.fName << " failed: use default settings");
         } else {
            TRACE(XERR, "unable to read the configuration file");
            return (std::list<XrdProofWorker *> *)0;
         }
      }
   }
   TRACE(DBG,  "returning list with " << fWorkers.size() << " entries");

   if (TRACING(HDBG)) Dump();

   return &fWorkers;
}

////////////////////////////////////////////////////////////////////////////////
/// Dump status

void XrdProofdNetMgr::Dump()
{
   const char *xpdloc = "NetMgr::Dump";

   XrdSysMutexHelper mhp(fMutex);

   XPDPRT("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
   XPDPRT("+ Active workers status");
   XPDPRT("+ Size: " << fWorkers.size());
   XPDPRT("+ ");

   std::list<XrdProofWorker *>::iterator iw;
   for (iw = fWorkers.begin(); iw != fWorkers.end(); ++iw) {
      XPDPRT("+ wrk: " << (*iw)->fHost << ":" << (*iw)->fPort << " type:" << (*iw)->fType <<
             " active sessions:" << (*iw)->Active());
   }
   XPDPRT("+ ");
   XPDPRT("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
}

////////////////////////////////////////////////////////////////////////////////
/// Return the list of unique nodes after having made sure that the info is
/// up-to-date

std::list<XrdProofWorker *> *XrdProofdNetMgr::GetNodes()
{
   XPDLOC(NMGR, "NetMgr::GetNodes")

   XrdSysMutexHelper mhp(fMutex);

   if (fResourceType == kRTStatic && fPROOFcfg.fName.length() > 0) {
      // Check if there were any changes in the config file
      if (fReloadPROOFcfg && ReadPROOFcfg(1) != 0) {
         if (fDfltFallback) {
            // Use default settings
            CreateDefaultPROOFcfg();
            TRACE(DBG, "parsing of " << fPROOFcfg.fName << " failed: use default settings");
         } else {
            TRACE(XERR, "unable to read the configuration file");
            return (std::list<XrdProofWorker *> *)0;
         }
      }
   }
   TRACE(DBG, "returning list with " << fNodes.size() << " entries");

   return &fNodes;
}

////////////////////////////////////////////////////////////////////////////////
/// Read PROOF config file and load the information in fWorkers.
/// NB: 'master' information here is ignored, because it is passed
///     via the 'xpd.workdir' and 'xpd.image' config directives

int XrdProofdNetMgr::ReadPROOFcfg(bool reset)
{
   XPDLOC(NMGR, "NetMgr::ReadPROOFcfg")

   TRACE(REQ, "saved time of last modification: " << fPROOFcfg.fMtime);

   // Lock the method to protect the lists.
   XrdSysMutexHelper mhp(fMutex);

   // Check inputs
   if (fPROOFcfg.fName.length() <= 0)
      return -1;

   // Get the modification time
   struct stat st;
   if (stat(fPROOFcfg.fName.c_str(), &st) != 0) {
      // If the file disappeared, reset the modification time so that we are sure
      // to reload it if it comes back
      if (errno == ENOENT) fPROOFcfg.fMtime = -1;
      if (!fDfltFallback) {
         TRACE(XERR, "unable to stat file: " << fPROOFcfg.fName << " - errno: " << errno);
      } else {
         TRACE(ALL, "file " << fPROOFcfg.fName << " cannot be parsed: use default configuration");
      }
      return -1;
   }
   TRACE(DBG, "time of last modification: " << st.st_mtime);

   // File should be loaded only once
   if (st.st_mtime <= fPROOFcfg.fMtime)
      return 0;

   // Save the modification time
   fPROOFcfg.fMtime = st.st_mtime;

   // Open the defined path.
   FILE *fin = 0;
   if (!(fin = fopen(fPROOFcfg.fName.c_str(), "r"))) {
      if (fWorkers.size() > 1) {
         TRACE(XERR, "unable to fopen file: " << fPROOFcfg.fName << " - errno: " << errno);
         TRACE(XERR, "continuing with existing list of workers.");
         return 0;
      } else {
         return -1;
      }
   }

   if (reset) {
      // Cleanup the worker list
      fWorkers.clear();
   }

   // Add default a master line if not yet there
   if (fRegWorkers.size() < 1) {
      XrdOucString mm("master ", 128);
      mm += fMgr->Host();
      fRegWorkers.push_back(new XrdProofWorker(mm.c_str()));
   } else {
      // Deactivate all current active workers
      std::list<XrdProofWorker *>::iterator w = fRegWorkers.begin();
      // Skip the master line
      ++w;
      for (; w != fRegWorkers.end(); ++w) {
         (*w)->fActive = 0;
      }
   }

   // Read now the directives
   int nw = 0;
   char lin[2048];
   while (fgets(lin, sizeof(lin), fin)) {
      // Skip empty lines
      int p = 0;
      while (lin[p++] == ' ') {
         ;
      }
      p--;
      if (lin[p] == '\0' || lin[p] == '\n')
         continue;

      // Skip comments
      if (lin[0] == '#')
         continue;

      // Remove trailing '\n';
      if (lin[strlen(lin)-1] == '\n')
         lin[strlen(lin)-1] = '\0';

      TRACE(DBG, "found line: " << lin);

      // Parse the line
      XrdProofWorker *pw = new XrdProofWorker(lin);

      const char *pfx[2] = { "master", "node" };
      if (!strncmp(lin, pfx[0], strlen(pfx[0])) ||
          !strncmp(lin, pfx[1], strlen(pfx[1]))) {
         // Init a master instance
         if (pw->fHost.beginswith("localhost") ||
             pw->Matches(fMgr->Host())) {
            // Replace the default line (the first with what found in the file)
            XrdProofWorker *fw = fRegWorkers.front();
            fw->Reset(lin);
         }
         // Ignore it
         SafeDelete(pw);
      } else {
         // Check if we have already it
         std::list<XrdProofWorker *>::iterator w = fRegWorkers.begin();
         // Skip the master line
         ++w;
         bool haveit = 0;
         while (w != fRegWorkers.end()) {
            if (!((*w)->fActive)) {
               if ((*w)->fHost == pw->fHost && (*w)->fPort == pw->fPort) {
                  (*w)->fActive = 1;
                  haveit = 1;
                  break;
               }
            }
            // Go to next
            ++w;
         }
         // If we do not have it, build a new worker object
         if (!haveit) {
            // Keep it
            fRegWorkers.push_back(pw);
         } else {
            // Drop it
            SafeDelete(pw);
         }
      }
   }

   // Copy the active workers
   std::list<XrdProofWorker *>::iterator w = fRegWorkers.begin();
   while (w != fRegWorkers.end()) {
      if ((*w)->fActive) {
         fWorkers.push_back(*w);
         nw++;
      }
      ++w;
   }

   // Close files
   fclose(fin);

   // Find unique nodes
   if (reset)
      FindUniqueNodes();

   // We are done
   return ((nw == 0) ? -1 : 0);
}

////////////////////////////////////////////////////////////////////////////////
/// Scan fWorkers for unique nodes (stored in fNodes).
/// Return the number of unque nodes.
/// NB: 'master' information here is ignored, because it is passed
///     via the 'xpd.workdir' and 'xpd.image' config directives

int XrdProofdNetMgr::FindUniqueNodes()
{
   XPDLOC(NMGR, "NetMgr::FindUniqueNodes")

   TRACE(REQ, "# workers: " << fWorkers.size());

   // Cleanup the nodes list
   fNodes.clear();

   // Build the list of unique nodes (skip the master line);
   if (fWorkers.size() > 1) {
      std::list<XrdProofWorker *>::iterator w = fWorkers.begin();
      ++w;
      for (; w != fWorkers.end(); ++w) if ((*w)->fActive) {
            bool add = 1;
            std::list<XrdProofWorker *>::iterator n;
            for (n = fNodes.begin() ; n != fNodes.end(); ++n) {
               if ((*n)->Matches(*w)) {
                  add = 0;
                  break;
               }
            }
            if (add)
               fNodes.push_back(*w);
         }
   }
   TRACE(REQ, "found " << fNodes.size() << " unique nodes");

   // We are done
   return fNodes.size();
}
back to top