Revision b7855e8f6890f719ae4199f415307b2fb8262329 authored by Unknown Author on 20 November 2004, 06:50:43 UTC, committed by Unknown Author on 20 November 2004, 06:50:43 UTC
git-svn-id: http://root.cern.ch/svn/root/tags/v4-01-04@10581 27541ba8-7e3a-0410-8455-c3a389f83636
1 parent aaad88d
TXConnectionMgr.cxx
// @(#)root/netx:$Name: $:$Id: TXConnectionMgr.cxx,v 1.2 2004/08/20 22:16:33 rdm Exp $
// Author: Alvise Dorigo, Fabrizio Furano
/*************************************************************************
* Copyright (C) 1995-2004, Rene Brun and Fons Rademakers. *
* All rights reserved. *
* *
* For the licensing terms see $ROOTSYS/LICENSE. *
* For the list of contributors see $ROOTSYS/README/CREDITS. *
*************************************************************************/
//////////////////////////////////////////////////////////////////////////
// //
// TXConnectionMgr //
// //
// Authors: Alvise Dorigo, Fabrizio Furano //
// INFN Padova, 2003 //
// //
// The Connection Manager handles socket communications for TXNetFile //
// action: connect, disconnect, read, write. It is a static object of //
// the TXNetFile class such that within a single application multiple //
// TXNetFile objects share the same connection manager. //
// The connection manager maps multiple logical connections on a single //
// physical connection. //
// There is one and only one logical connection per client (XNTetFile //
// object), and one and only one physical connection per server:port. //
// Thus multiple TXNetFile objects withing a given application share //
// the same physical TCP channel to communicate with the server. //
// This reduces the time overhead for socket creation and reduces also //
// the server load due to handling many sockets. //
// //
//////////////////////////////////////////////////////////////////////////
#include "TEnv.h"
#include "TXConnectionMgr.h"
#include "TXDebug.h"
#include "TXMessage.h"
#include "TError.h"
#include "TXMutexLocker.h"
extern TEnv *gEnv;
ClassImp(TXConnectionMgr);
TXConnectionMgr *TXConnectionMgr::fgInstance = 0;
//_____________________________________________________________________________
extern "C" void * GarbageCollectorThread(void * arg)
{
// Function executed in the garbage collector thread
int i;
TXConnectionMgr *thisObj = (TXConnectionMgr *)arg;
TThread::SetCancelDeferred();
TThread::SetCancelOn();
while (1) {
TThread::CancelPoint();
thisObj->GarbageCollect();
for (i = 0; i < 10; i++) {
TThread::CancelPoint();
gSystem->Sleep(200);
}
}
TThread::Exit();
return 0;
}
//_____________________________________________________________________________
TXConnectionMgr* TXConnectionMgr::Instance()
{
// Create unique instance of the connection manager
if(fgInstance == 0) {
fgInstance = new TXConnectionMgr;
if(!fgInstance) {
gSystem->Error("Instance",
"Fatal ERROR *** Object creation with new failed !"
" Probable system resources exhausted.");
gSystem->Abort();
}
}
return fgInstance;
}
//_____________________________________________________________________________
void TXConnectionMgr::Reset()
{
// Reset the connection manager
delete(fgInstance);
fgInstance = 0;
}
//____________________________________________________
TXConnectionMgr::TXConnectionMgr()
{
// TXConnectionMgr constructor.
// Creates a Connection Manager object.
// Starts the garbage collector thread.
// Initialization of lock mutex
fMutex = new TMutex(kTRUE);
if (!fMutex)
Info("TXConnectionMgr", "Can't create mutex: out of system resources");
fThreadHandler = 0;
// Garbage collector thread creation void *(*start_routine, void*)
if (gEnv->GetValue("XNet.StartGarbageCollectorThread",
DFLT_STARTGARBAGECOLLECTORTHREAD)) {
// The type of the thread func makes it a detached thread
fThreadHandler = new TThread((TThread::VoidFunc_t) GarbageCollectorThread,
this);
if (!fThreadHandler)
Info("TXConnectionMgr",
"Can't create garbage collector thread: out of system resources");
fThreadHandler->Run();
}
else
if(DebugLevel() >= TXDebug::kHIDEBUG)
Info("TXConnectionMgr",
"Explicitly requested not to start the garbage collector"
" thread. Are you sure?");
}
//_____________________________________________________________________________
TXConnectionMgr::~TXConnectionMgr()
{
// Deletes mutex locks, stops garbage collector thread.
UInt_t i=0;
{
TXMutexLocker mtx(fMutex);
for (i = 0; i < fLogVec.size(); i++)
if (fLogVec[i]) Disconnect(i, kFALSE);
}
if (fThreadHandler) {
fThreadHandler->Kill();
//fThreadHandler->Join();
}
GarbageCollect();
SafeDelete(fMutex);
delete(fgInstance);
}
//_____________________________________________________________________________
void TXConnectionMgr::GarbageCollect()
{
// Get rid of unused physical connections. 'Unused' means not used for a
// TTL time from any logical one. The TTL depends on the kind of remote
// server. For a load balancer the TTL is very high, while for a data server
// is quite small.
// Mutual exclusion on the vectors and other vars
{
TXMutexLocker mtx(fMutex);
// We cycle all the physical connections
for (unsigned short int i = 0; i < fPhyVec.size(); i++) {
// If a single physical connection has no linked logical connections,
// then we kill it if its TTL has expired
if ( fPhyVec[i] && (GetPhyConnectionRefCount(fPhyVec[i]) <= 0) &&
fPhyVec[i]->ExpiredTTL() ) {
if (DebugLevel() >= TXDebug::kDUMPDEBUG)
Info("GarbageCollect", "Purging physical connection %d", i);
// Wait until the physical connection is unlocked (it may be in use by
// slow processes)
fPhyVec[i]->Disconnect();
SafeDelete(fPhyVec[i]);
fPhyVec[i] = 0;
if (DebugLevel() >= TXDebug::kHIDEBUG)
Info("GarbageCollect", "Purged physical connection %d", i);
}
}
}
}
//_____________________________________________________________________________
short int TXConnectionMgr::Connect(TString RemoteAddress,
Int_t TcpPort, Int_t TcpWindowSize)
{
// Connects to the remote server:
// - Looks for an existing physical connection already bound to
// RemoteAddress:TcpPort;
// - If needed, creates a TCP channel to RemoteAddress:TcpPort
// (this is a physical connection);
// - Creates a logical connection and binds it to the previous
// created physical connection;
// - Returns the logical connection ID. Every client will use this
// ID to deal with the server.
TXLogConnection *logconn;
TXPhyConnection *phyconn;
short int newid;
Bool_t phyfound;
// First we get a new logical connection object
if (DebugLevel() >= TXDebug::kHIDEBUG)
Info("Connect", "Creating a logical connection...");
logconn = new TXLogConnection();
if (!logconn) {
Error("Connect","Fatal ERROR *** Object creation with new failed !"
" Probable system resources exhausted.");
gSystem->Abort();
}
if(DebugLevel() >= TXDebug::kDUMPDEBUG)
Info("Connect", "Getting lock...");
{
TXMutexLocker mtx(fMutex);
// If we already have a physical connection to that host:port,
// then we use that
phyfound = kFALSE;
if (DebugLevel() >= TXDebug::kHIDEBUG)
Info("Connect",
"Looking for an available physical connection for address [%s:%d]",
RemoteAddress.Data(), TcpPort);
for (unsigned short int i=0; i < fPhyVec.size(); i++) {
if (fPhyVec[i] && fPhyVec[i]->IsValid() &&
fPhyVec[i]->IsPort(TcpPort) && fPhyVec[i]->IsAddress(RemoteAddress)) {
// We link that physical connection to the new logical connection
fPhyVec[i]->Touch();
logconn->SetPhyConnection( fPhyVec[i] );
phyfound = kTRUE;
break;
}
}
}
if (!phyfound) {
if (DebugLevel() >= TXDebug::kHIDEBUG)
Info("Connect",
"Physical connection not found. Creating a new one...");
// If not already present, then we must build a new physical connection,
// and try to connect it
// While we are trying to connect, the mutex must be unlocked
// Note that at this point logconn is a pure local instance, so it
// does not need to be protected by mutex
phyconn = new TXPhyConnection(this);
if (!phyconn) {
Error("Connect","Fatal ERROR *** Object creation with new failed !"
" Probable system resources exhausted.");
gSystem->Abort();
}
if (phyconn && phyconn->Connect(RemoteAddress, TcpPort, TcpWindowSize)) {
logconn->SetPhyConnection(phyconn);
if (DebugLevel() >= TXDebug::kHIDEBUG)
Info("Connect", "New physical connection to server [%s:%d]"
" succesfully created.",
RemoteAddress.Data(), TcpPort);
} else
return -1;
}
// Now, we are connected to the host desired.
// The physical connection can be old or newly created
{
TXMutexLocker mtx(fMutex);
// Then, if needed, we push the physical connection into its vector
if (!phyfound)
fPhyVec.push_back(phyconn);
// Then we push the logical connection into its vector
fLogVec.push_back(logconn);
// Its ID is its position inside the vector, we must return it later
newid = fLogVec.size()-1;
// Now some debug log
if (DebugLevel() >= TXDebug::kHIDEBUG) {
Int_t logCnt = 0, phyCnt = 0;
for (unsigned short int i=0; i < fPhyVec.size(); i++)
if (fPhyVec[i])
phyCnt++;
for (unsigned short int i=0; i < fLogVec.size(); i++)
if (fLogVec[i])
logCnt++;
Info("Connect",
"LogConn: size:%d, count:%d - PhyConn: size:%d, count:%d",
fLogVec.size(), logCnt, phyCnt, fPhyVec.size());
}
}
return newid;
}
//_____________________________________________________________________________
void TXConnectionMgr::Disconnect(short int LogConnectionID,
Bool_t ForcePhysicalDisc)
{
// Deletes a logical connection.
// Also deletes the related physical one if ForcePhysicalDisc=TRUE.
if (DebugLevel() >= TXDebug::kDUMPDEBUG)
Info("Disconnect", "Getting lock...");
{
TXMutexLocker mtx(fMutex);
if ((UInt_t(LogConnectionID) >= fLogVec.size()) || (!fLogVec[LogConnectionID])) {
Error("Disconnect", "Destroying nonexistent logconnid %d.", LogConnectionID);
return;
}
if (ForcePhysicalDisc) {
// We disconnect the phyconn
// But it will be removed by the garbagecollector as soon as possible
// Note that here we cannot destroy the phyconn, since there can be other
// logconns pointing to it the phyconn will be removed when there are no
// more logconns pointing to it
fLogVec[LogConnectionID]->GetPhyConnection()->SetTTL(0);
fLogVec[LogConnectionID]->GetPhyConnection()->Disconnect();
}
fLogVec[LogConnectionID]->GetPhyConnection()->Touch();
SafeDelete(fLogVec[LogConnectionID]);
fLogVec[LogConnectionID] = 0;
if (DebugLevel() >= TXDebug::kDUMPDEBUG)
Info("Disconnect", "Unlocking...");
}
}
//_____________________________________________________________________________
Int_t TXConnectionMgr::ReadRaw(short int LogConnectionID, void *buffer,
Int_t BufferLength, ESendRecvOptions opt)
{
// Read BufferLength bytes from the logical connection LogConnectionID
TXLogConnection *logconn;
logconn = GetConnection(LogConnectionID);
if (logconn) {
if (DebugLevel() >= TXDebug::kDUMPDEBUG)
Info("ReadRaw", "Reading from logical connection %d",
LogConnectionID);
return logconn->ReadRaw(buffer, BufferLength, opt);
}
else {
Info("ReadRaw", "There's not a logical connection with id=%d",
LogConnectionID);
return(-1);
}
}
//_____________________________________________________________________________
TXMessage *TXConnectionMgr::ReadMsg(short int LogConnectionID, ESendRecvOptions opt)
{
TXLogConnection *logconn;
TXMessage *mex;
logconn = GetConnection(LogConnectionID);
if (logconn) {
// if (DebugLevel() >= TXDebug::kDUMPDEBUG)
// Info("ReadMsg", "Reading from logical connection %d",
// LogConnectionID);
}
// Parametric asynchronous stuff.
// If we are going Sync, then we must build the message here,
// otherwise the messages come directly from the queue
if ( !gEnv->GetValue("XNet.GoAsynchronous", DFLT_GOASYNC) ) {
// We get a new message directly from the socket.
// The message gets inserted inside the phyconn queue
// This line of code will be moved to a reader thread inside TXPhyConnection
// Timeouts must not be ignored here, indeed they are an error
// because we are waiting for a message that must come quickly
mex = logconn->GetPhyConnection()->BuildXMessage(opt, kFALSE, kFALSE);
}
else {
// Now we get the message from the queue, with the timeouts needed
mex = logconn->GetPhyConnection()->ReadXMessage(LogConnectionID);
}
// Return the message unmarshalled to ClientServerCmd
return mex;
}
//_____________________________________________________________________________
Int_t TXConnectionMgr::WriteRaw(short int LogConnectionID, const void *buffer,
Int_t BufferLength, ESendRecvOptions opt)
{
// Write BufferLength bytes into the logical connection LogConnectionID
TXLogConnection *logconn;
logconn = GetConnection(LogConnectionID);
if (logconn) {
if (DebugLevel() >= TXDebug::kDUMPDEBUG)
Info("WriteRaw", "Writing %d bytes to logical connection %d.",
BufferLength, LogConnectionID);
return logconn->WriteRaw(buffer, BufferLength, opt);
}
else {
Info("WriteRaw", "There's not a logical connection with id=%d",
LogConnectionID);
return(-1);
}
}
//_____________________________________________________________________________
TXLogConnection *TXConnectionMgr::GetConnection(short int LogConnectionID)
{
// Return a logical connection object that has LogConnectionID as its ID.
TXLogConnection *res;
{
TXMutexLocker mtx(fMutex);
res = fLogVec[LogConnectionID];
}
return res;
}
//_____________________________________________________________________________
short int TXConnectionMgr::GetPhyConnectionRefCount(TXPhyConnection *PhyConn)
{
// Return the number of logical connections bound to the physical one 'PhyConn'
int cnt = 0;
{
TXMutexLocker mtx(fMutex);
for (unsigned short int i = 0; i < fLogVec.size(); i++)
if ( fLogVec[i] && (fLogVec[i]->GetPhyConnection() == PhyConn) ) cnt++;
}
return cnt;
}
//_____________________________________________________________________________
Bool_t TXConnectionMgr::ProcessUnsolicitedMsg(TXUnsolicitedMsgSender *sender,
TXMessage *unsolmsg)
{
// We are here if an unsolicited response comes from a physical connection
// The response comes in the form of an TXMessage *, that must NOT be
// destroyed after processing. It is destroyed by the first sender.
// Remember that we are in a separate thread, since unsolicited responses
// are asynchronous by nature.
Info("Write", "Processing unsolicited response");
// Local processing ....
// Now we propagate the message to the interested objects.
// In our architecture, the interested objects are the objects which
// self-registered in the logical connections belonging to the Phyconn
// which threw the event
// So we throw the evt towards each logical connection
{
TXMutexLocker mtx(fMutex);
for (unsigned short int i = 0; i < fLogVec.size(); i++)
if ( fLogVec[i] && (fLogVec[i]->GetPhyConnection() == sender) ) {
fLogVec[i]->ProcessUnsolicitedMsg(sender, unsolmsg);
}
}
return kTRUE;
}
Computing file changes ...