Revision 5f91868ceea5767402390b63c36a3e1958e4fa31 authored by Dhruba Borthakur on 06 November 2012, 00:51:55 UTC, committed by Dhruba Borthakur on 06 November 2012, 00:51:55 UTC
Conflicts:
	db/version_set.cc
	util/options.cc
2 parent s 81f735d + cb7a002
Raw File
env_hdfs.h
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
// Copyright (c) 2012 Facebook. All rights reserved.

#ifndef LEVELDB_HDFS_FILE_H
#define LEVELDB_HDFS_FILE_H

#include <algorithm>
#include <stdio.h>
#include <sys/time.h>
#include <time.h>
#include <iostream>
#include "leveldb/env.h"
#include "leveldb/status.h"

#ifdef USE_HDFS
#include "hdfs/hdfs.h"

namespace leveldb {

static const std::string kProto = "hdfs://";
static const std::string pathsep = "/";

// Thrown during execution when there is an issue with the supplied
// arguments.
class HdfsUsageException : public std::exception { };

// A simple exception that indicates something went wrong that is not
// recoverable.  The intention is for the message to be printed (with
// nothing else) and the process terminate.
class HdfsFatalException : public std::exception {
public:
  explicit HdfsFatalException(const std::string& s) : what_(s) { }
  virtual ~HdfsFatalException() throw() { }
  virtual const char* what() const throw() {
    return what_.c_str();
  }
private:
  const std::string what_;
};

//
// The HDFS environment for leveldb. This class overrides all the
// file/dir access methods and delegates the thread-mgmt methods to the
// default posix environment.
//
class HdfsEnv : public Env {

 public:
  HdfsEnv(const std::string& fsname) : fsname_(fsname) {
    posixEnv = Env::Default();
    fileSys_ = connectToPath(fsname_);
  }

  virtual ~HdfsEnv() {
    fprintf(stderr, "Destroying HdfsEnv::Default()\n");
    hdfsDisconnect(fileSys_);
  }

  virtual Status NewSequentialFile(const std::string& fname,
                                   SequentialFile** result);

  virtual Status NewRandomAccessFile(const std::string& fname,
                                     RandomAccessFile** result);

  virtual Status NewWritableFile(const std::string& fname,
                                 WritableFile** result);

  virtual bool FileExists(const std::string& fname);

  virtual Status GetChildren(const std::string& path,
                             std::vector<std::string>* result);

  virtual Status DeleteFile(const std::string& fname);

  virtual Status CreateDir(const std::string& name);

  virtual Status DeleteDir(const std::string& name);

  virtual Status GetFileSize(const std::string& fname, uint64_t* size);

  virtual Status RenameFile(const std::string& src, const std::string& target);

  virtual Status LockFile(const std::string& fname, FileLock** lock);

  virtual Status UnlockFile(FileLock* lock);

  virtual Status NewLogger(const std::string& fname, Logger** result);

  virtual void Schedule( void (*function)(void* arg), void* arg) {
    posixEnv->Schedule(function, arg);
  }

  virtual void StartThread(void (*function)(void* arg), void* arg) {
    posixEnv->StartThread(function, arg);
  }

  virtual Status GetTestDirectory(std::string* path) {
    return posixEnv->GetTestDirectory(path);
  }

  virtual uint64_t NowMicros() {
    return posixEnv->NowMicros();
  }

  virtual void SleepForMicroseconds(int micros) {
    posixEnv->SleepForMicroseconds(micros);
  }

  virtual Status GetHostName(char* name, uint64_t len) {
    return posixEnv->GetHostName(name, len);
  }

  virtual Status GetCurrentTime(int64_t* unix_time) {
    return posixEnv->GetCurrentTime(unix_time);
  }

  virtual Status GetAbsolutePath(const std::string& db_path,
      std::string* output_path) {
    return posixEnv->GetAbsolutePath(db_path, output_path);
  }

  virtual void SetBackgroundThreads(int number) {
    posixEnv->SetBackgroundThreads(number);
  }

  virtual std::string TimeToString(uint64_t number) {
    return posixEnv->TimeToString(number);
  }

  static uint64_t gettid() {
    assert(sizeof(pthread_t) <= sizeof(uint64_t));
    return (uint64_t)pthread_self();
  }

 private:
  std::string fsname_;  // string of the form "hdfs://hostname:port/"
  hdfsFS fileSys_;      //  a single FileSystem object for all files
  Env*  posixEnv;       // This object is derived from Env, but not from
                        // posixEnv. We have posixnv as an encapsulated
                        // object here so that we can use posix timers,
                        // posix threads, etc.

  /**
   * If the URI is specified of the form hdfs://server:port/path, 
   * then connect to the specified cluster
   * else connect to default.
   */
  hdfsFS connectToPath(const std::string& uri) {
    if (uri.empty()) {
      return NULL;
    }
    if (uri.find(kProto) != 0) {
      // uri doesn't start with hdfs:// -> use default:0, which is special
      // to libhdfs.
      return hdfsConnectNewInstance("default", 0);
    }
    const std::string hostport = uri.substr(kProto.length());

    std::vector <std::string> parts;
    split(hostport, ':', parts);
    if (parts.size() != 2) {
      throw HdfsFatalException("Bad uri for hdfs " + uri);
    }
    // parts[0] = hosts, parts[1] = port/xxx/yyy
    std::string host(parts[0]);
    std::string remaining(parts[1]);

    int rem = remaining.find(pathsep);
    std::string portStr = (rem == 0 ? remaining :
                           remaining.substr(0, rem));
    
    tPort port;
    port = atoi(portStr.c_str());
    if (port == 0) {
      throw HdfsFatalException("Bad host-port for hdfs " + uri);
    }
    hdfsFS fs = hdfsConnectNewInstance(host.c_str(), port);
    return fs;
  }

  void split(const std::string &s, char delim, 
             std::vector<std::string> &elems) {
    elems.clear();
    size_t prev = 0;
    size_t pos = s.find(delim);
    while (pos != std::string::npos) {
      elems.push_back(s.substr(prev, pos));
      prev = pos + 1;
      pos = s.find(delim, prev);
    }
    elems.push_back(s.substr(prev, s.size()));
  }
};

}  // namespace leveldb

#else // USE_HDFS


namespace leveldb {

static const Status notsup;

class HdfsEnv : public Env {

 public:
  HdfsEnv(const std::string& fsname) {
    fprintf(stderr, "You have not build leveldb with HDFS support\n");
    fprintf(stderr, "Please see hdfs/README for details\n");
    throw new std::exception();
  }

  virtual ~HdfsEnv() {
  }

  virtual Status NewSequentialFile(const std::string& fname,
                                   SequentialFile** result);

  virtual Status NewRandomAccessFile(const std::string& fname,
                                     RandomAccessFile** result){ return notsup;}

  virtual Status NewWritableFile(const std::string& fname,
                                 WritableFile** result){return notsup;}

  virtual bool FileExists(const std::string& fname){return false;}

  virtual Status GetChildren(const std::string& path,
                             std::vector<std::string>* result){return notsup;}

  virtual Status DeleteFile(const std::string& fname){return notsup;}

  virtual Status CreateDir(const std::string& name){return notsup;}

  virtual Status DeleteDir(const std::string& name){return notsup;}

  virtual Status GetFileSize(const std::string& fname, uint64_t* size){return notsup;}

  virtual Status RenameFile(const std::string& src, const std::string& target){return notsup;}

  virtual Status LockFile(const std::string& fname, FileLock** lock){return notsup;}

  virtual Status UnlockFile(FileLock* lock){return notsup;}

  virtual Status NewLogger(const std::string& fname, Logger** result){return notsup;}

  virtual void Schedule( void (*function)(void* arg), void* arg) {}

  virtual void StartThread(void (*function)(void* arg), void* arg) {}

  virtual Status GetTestDirectory(std::string* path) {return notsup;}

  virtual uint64_t NowMicros() {return 0;}

  virtual void SleepForMicroseconds(int micros) {}

  virtual Status GetHostName(char* name, uint64_t len) {return notsup;}

  virtual Status GetCurrentTime(int64_t* unix_time) {return notsup;}

  virtual Status GetAbsolutePath(const std::string& db_path,
      std::string* outputpath) {return notsup;}

  virtual void SetBackgroundThreads(int number) {}

  virtual std::string TimeToString(uint64_t number) { return "";}
};
}

#endif // USE_HDFS

#endif // LEVELDB_HDFS_FILE_H
back to top