Example hdfs_write_revised.c File

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <stdio.h>
#include "hdfs.h" 
#include <errno.h>
#include <fcntl.h>

tSize
writeAtOffset(hdfsFS fs, hdfsFile writeFile, tOffset offset, 
               char *buffer, tSize bufferSize)
{
  tSize ret = 0;

  if (fs == NULL || writeFile == NULL || buffer == NULL) {
    return -1;
  }

  ret = hdfsSeek(fs, writeFile, offset);
  if (!ret) {
    //hdfsWrite will return -1 if ret != number of bytes asked to 
    //be written.
    ret = hdfsWrite(fs, writeFile, buffer, bufferSize);
    if (ret < 0) {
      fprintf(stderr, "hdfsWrite failed with error %d \n", errno);
    }
  } else {
    fprintf(stderr, "hdfsSeek failed with error %d \n", errno);
  }

  if (ret < 0) {
    //hdfsWrite does a flush in case of an error, explicit flush
    //is not required.
    hdfsCloseFile(fs, writeFile);    
  }

  //Current offset within the file will be positioned at (offset + writeBytes)th byte.
  return ret;
}

tSize
positionalWrite(hdfsFS fs, hdfsFile writeFile, tOffset offset, 
               char *buffer, tSize bufferSize)
{
  tSize writeBytes = 0;

  if (fs == NULL || writeFile == NULL || buffer == NULL) {
    return -1;
  }

  writeBytes = hdfsPwrite(fs, writeFile, offset, buffer, bufferSize);
  if (writeBytes < 0) { 
    fprintf(stderr, "hdfsPwrite failed with error %d \n", errno);
    hdfsCloseFile(fs, writeFile);
  }

  //Current offset within the file will not be advanced if hdfsPwrite is used
  return writeBytes;
}

int
writeLength(hdfsFS fs, hdfsFile writeFile, char *buffer, tSize bufferSize, tSize writeSize)
{
  tSize writeBytes = 0;
  tSize ret = 0;
  uint64_t totalWrite = 0;

  if (fs == NULL || writeFile == NULL || buffer == NULL) {
    return -1;
  }

  if (writeSize == 0) {
    return 0;
  }

  for (writeBytes=0; writeBytes<writeSize; writeBytes+=bufferSize) {
    ret = hdfsWrite(fs, writeFile, (void*)buffer, bufferSize);
    if (ret > 0) {
      totalWrite += ret; 
    } else {   
      fprintf(stderr, "hdfsWrite failed with error %d \n", errno);
      hdfsCloseFile(fs, writeFile);
      return -1;
    }
  }

  return 0; 
}

int 
main(int argc, char **argv) 
{
  if (argc != 4) {
    fprintf(stderr, "Usage: hdfs_write <filename> <filesize> <buffersize>\n");
    exit(-1);
  }

  int err = hdfsSetRpcTimeout(30);
  if (err) {
    fprintf(stderr, "Oops! Failed to set rpc timeout!\n");
    exit(-1);
  }

  hdfsFS fs = hdfsConnect("default", 0);
  if (!fs) {
    fprintf(stderr, "Oops! Failed to connect to hdfs!\n");
    exit(-1);
  } 

  const char* rfile = argv[1];
  tSize fileSize = strtoul(argv[2], NULL, 10);
  tSize bufferSize = strtoul(argv[3], NULL, 10);

  //O_WRONLY creates the file if the file doesn't exist.
  //O_WRONLY truncates the file if the file exists.
  //O_WRONLY | O_APPEND will preserve the contents of the file if the file exists.
  hdfsFile writeFile = hdfsOpenFile(fs, rfile, O_WRONLY, 0, 0, 0);
  if (!writeFile) {
    fprintf(stderr, "Failed to open %s for writing!\n", rfile);
    exit(-2);
  }

  char* buffer = malloc(sizeof(char) * bufferSize);
  if(buffer == NULL) {
    fprintf(stderr, "Failed to allocate memory!\n");
    return -2;
  }

  int i;
  for (i=0; i<bufferSize; i++) {
    buffer[i] = 'a' + i%26;
  }

  //Write entire file from the beginning    
  int ret = writeLength(fs, writeFile, buffer, bufferSize, fileSize);
  if (ret < 0) {
    goto done;
  }

  //Write file at a particular offset 
  //In this case, we are writing from offset 0
  tSize writeBytes = writeAtOffset(fs, writeFile, 0, buffer, bufferSize);
  if (writeBytes < 0) {
    goto done;
  }

  //Write file at a particular offset using positional write
  //In this case, write from offset 20 
  writeBytes = positionalWrite(fs, writeFile, 20, buffer, bufferSize);
  if (writeBytes < 0) {
    goto done;
  }

  hdfsCloseFile(fs, writeFile);
done:
  free(buffer);
  hdfsDisconnect(fs);

  return 0;
}

/**
 * vim: ts=4: sw=4: et:
 */