hdfs_write_revised.c

This application demonstrates how to write to files by using the APIs hdfsWrite() and hdfsPwrite(): hdfs_write_revised.c

Before running this application:

  • Ensure that you have access either to a cluster running MapR-FS.
  • Ensure that a text-based file that you have access to exists on the cluster. Note the path to the file and the size of the file in bytes.
  • The content of the file will be deleted before the first write is performed by the application.
  • Decide on the length in bytes of a string to write to the file.

To build and run it, download it from this page to a MapR client or to a system with the mapr-core package installed. Then, modify the run.sh script in Building and Running C Applications on MapR-FS Clients to point to this sample application. Run the script and then run the application.

The application includes these header files:

  • stdio.h
  • hdfs.h
  • errno.h
  • fcntl.h

The APIs are defined in hdfs.h. The file fcntl.h defines the file-access flags.

The application performs the actions that are described in the following sections.

Takes a filename, file size, and buffer size as input

When you launch the application, provide the path and name of the file, the size of the file, and the number of bytes to write.

hdfs_write <filename> <filesize> <buffersize>

Sets an RPC timeout

hdfsSetRpcTimeout() is specific to the libMapRClient version of libhdfs and takes a value that is specified in seconds. The default is 99 seconds. If you change this value, set it either to 0 (which eliminates timeouts) or to a value greater than 30.

int err = hdfsSetRpcTimeout(30);
if (err) {
  fprintf(stderr, "Failed to set rpc timeout!\n");
  exit(-1);
}
Connects to a filesystem, using an API that is supported in the hadoop-2.x version of libhdfs

The application tries to connect to the first MapR-FS cluster that is specified in the mapr-clusters.conf file in the MAPR_HOME/conf directory on the client. After connecting to the filesystem, the application returns a handle to the filesystem.

hdfsFS fs = hdfsConnect("default", 0);
if (!fs) {
  fprintf(stderr, "Oops! Failed to connect to hdfs!\n");
  exit(-1);
}
Stores the values of the arguments

The application stores the values of the arguments in a character array and in two variables of type tSize. This datatype is defined in hdfs.h and is a fixed-width, signed 32-byte integer type for storing the size of data for read or write operations.

const char* rfile = argv[1];
tSize fileSize = strtoul(argv[2], NULL, 10);
tSize bufferSize = strtoul(argv[3], NULL, 10);
Opens the file that you specified

The application opens the specified file, passing the following values to the hdfsOpenFile() function:

  • The handle to the filesystem
  • The name of the file, which you supplied when you launched the application.
  • A flag to indicate the mode in which to open the file. In this case, the flag is O_WRONLY. This flag creates the file if the file does not exist and truncates the file if the file does exist. If the file existed and you wanted to preserve the content of the file, you would specify O_WRONLY | O_APPEND for flag. These flags are defined in the header file fcntl.h.
  • The default chunk size for the directory in which the file is either located or will be created. This value is specified by the 0 in the last parameter.

Although there are two other parameters in the hdfsOpenFile() function – the fourth and fifth, the libMapRClient version of libhdfs ignores them.

hdfsFile writeFile = hdfsOpenFile(fs, rfile, O_WRONLY, 0, 0, 0);
if (!writeFile) {
   fprintf(stderr, "Failed to open %s for writing!\n", rfile);
   exit(-2);
}
Creates a buffer of the size that you specified and populates the buffer

At this point that the application, creates a string to populate the buffer. This is the data that the application will write.

char* buffer = malloc(sizeof(char) * bufferSize);
if(buffer == NULL) {
  fprintf(stderr, "Failed to allocate memory!\n");
  return -2;
}
int i;
  for (i=0; i<bufferSize; i++) {
  buffer[i] = 'a' + i%26;
}
Writes an entire file with hdfsWrite()

The application calls the function writeLength():

int ret = writeLength(fs, writeFile, buffer, bufferSize, fileSize);
if (ret < 0) {
  goto done;
}

This function writes the content of the buffer to the file, starting at offset 0.

int
writeLength(hdfsFS fs, hdfsFile writeFile, char *buffer, tSize bufferSize, tSize writeSize)
{
  tSize writeBytes = 0;
  tSize ret = 0;
  uint64_t totalWrite = 0;
  if (fs == NULL || writeFile == NULL || buffer == NULL) {
    return -1;
  }
  if (writeSize == 0) {
    return 0;
  }
  for (writeBytes=0; writeBytes<writeSize; writeBytes+=bufferSize) {
    ret = hdfsWrite(fs, writeFile, (void*)buffer, bufferSize);
    if (ret > 0) {
      totalWrite += ret; 
    } else {   
      fprintf(stderr, "hdfsWrite failed with error %d \n", errno);
      hdfsCloseFile(fs, writeFile);
      return -1;
    }
  }
  return 0; 
}
Seeks an offset and writes from that offset with hdfsWrite()

The application next calls the function writeAtOffse():

  tSize writeBytes = writeAtOffset(fs, writeFile, 0, buffer, bufferSize);
if (writeBytes < 0) {
  goto done;
}

This function writes the content of the buffer to the file, starting at the specified offset. If the file already exists, the file is first truncated to this offset before the write operation begins. In this case, the specified offset is 0.

The difference between this function and the previous function is that, before writing, it calls hdfsSeek() to move to the specified offset in the file.

tSize
writeAtOffset(hdfsFS fs, hdfsFile writeFile, tOffset offset, 
               char *buffer, tSize bufferSize)
{
  tSize ret = 0;
  if (fs == NULL || writeFile == NULL || buffer == NULL) {
    return -1;
  }
  ret = hdfsSeek(fs, writeFile, offset);
  if (!ret) {
    //hdfsWrite will return -1 if ret != number of bytes asked to 
    //be written.
    ret = hdfsWrite(fs, writeFile, buffer, bufferSize);
    if (ret < 0) {
      fprintf(stderr, "hdfsWrite failed with error %d \n", errno);
    }
  } else {
    fprintf(stderr, "hdfsSeek failed with error %d \n", errno);
  }
  if (ret < 0) {
    //hdfsWrite does a flush in case of an error, explicit flush
    //is not required.
    hdfsCloseFile(fs, writeFile);    
  }
  //Current offset within the file will be positioned at (offset + writeBytes)th byte.
  return ret;
}
Performs a positional write with hdfsPwrite()

The application next calls the function positionalWrite():

writeBytes = positionalWrite(fs, writeFile, 20, buffer, bufferSize);
if (writeBytes < 0) {
  goto done;
}

This function writes the content of the buffer to the file, starting at the offset that you specify.

tSize
positionalWrite(hdfsFS fs, hdfsFile writeFile, tOffset offset, 
               char *buffer, tSize bufferSize)
{
  tSize writeBytes = 0;
  if (fs == NULL || writeFile == NULL || buffer == NULL) {
    return -1;
  }
  writeBytes = hdfsPwrite(fs, writeFile, offset, buffer, bufferSize);
  if (writeBytes < 0) { 
    fprintf(stderr, "hdfsPwrite failed with error %d \n", errno);
    hdfsCloseFile(fs, writeFile);
  }
  //Current offset within the file will not be advanced if hdfsPwrite is used
  return writeBytes;
}
Closes the file
hdfsCloseFile(fs, writeFile);
Frees the buffer
free(buffer);
Disconnects from the filesystem
hdfsDisconnect(fs);
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <stdio.h>
#include "hdfs.h" 
#include <errno.h>
#include <fcntl.h>

tSize
writeAtOffset(hdfsFS fs, hdfsFile writeFile, tOffset offset, 
               char *buffer, tSize bufferSize)
{
  tSize ret = 0;

  if (fs == NULL || writeFile == NULL || buffer == NULL) {
    return -1;
  }

  ret = hdfsSeek(fs, writeFile, offset);
  if (!ret) {
    //hdfsWrite will return -1 if ret != number of bytes asked to 
    //be written.
    ret = hdfsWrite(fs, writeFile, buffer, bufferSize);
    if (ret < 0) {
      fprintf(stderr, "hdfsWrite failed with error %d \n", errno);
    }
  } else {
    fprintf(stderr, "hdfsSeek failed with error %d \n", errno);
  }

  if (ret < 0) {
    //hdfsWrite does a flush in case of an error, explicit flush
    //is not required.
    hdfsCloseFile(fs, writeFile);    
  }

  //Current offset within the file will be positioned at (offset + writeBytes)th byte.
  return ret;
}

tSize
positionalWrite(hdfsFS fs, hdfsFile writeFile, tOffset offset, 
               char *buffer, tSize bufferSize)
{
  tSize writeBytes = 0;

  if (fs == NULL || writeFile == NULL || buffer == NULL) {
    return -1;
  }

  writeBytes = hdfsPwrite(fs, writeFile, offset, buffer, bufferSize);
  if (writeBytes < 0) { 
    fprintf(stderr, "hdfsPwrite failed with error %d \n", errno);
    hdfsCloseFile(fs, writeFile);
  }

  //Current offset within the file will not be advanced if hdfsPwrite is used
  return writeBytes;
}

int
writeLength(hdfsFS fs, hdfsFile writeFile, char *buffer, tSize bufferSize, tSize writeSize)
{
  tSize writeBytes = 0;
  tSize ret = 0;
  uint64_t totalWrite = 0;

  if (fs == NULL || writeFile == NULL || buffer == NULL) {
    return -1;
  }

  if (writeSize == 0) {
    return 0;
  }

  for (writeBytes=0; writeBytes<writeSize; writeBytes+=bufferSize) {
    ret = hdfsWrite(fs, writeFile, (void*)buffer, bufferSize);
    if (ret > 0) {
      totalWrite += ret; 
    } else {   
      fprintf(stderr, "hdfsWrite failed with error %d \n", errno);
      hdfsCloseFile(fs, writeFile);
      return -1;
    }
  }

  return 0; 
}

int 
main(int argc, char **argv) 
{
  if (argc != 4) {
    fprintf(stderr, "Usage: hdfs_write <filename> <filesize> <buffersize>\n");
    exit(-1);
  }

  int err = hdfsSetRpcTimeout(30);
  if (err) {
    fprintf(stderr, "Oops! Failed to set rpc timeout!\n");
    exit(-1);
  }

  hdfsFS fs = hdfsConnect("default", 0);
  if (!fs) {
    fprintf(stderr, "Oops! Failed to connect to hdfs!\n");
    exit(-1);
  } 

  const char* rfile = argv[1];
  tSize fileSize = strtoul(argv[2], NULL, 10);
  tSize bufferSize = strtoul(argv[3], NULL, 10);

  //O_WRONLY creates the file if the file doesn't exist.
  //O_WRONLY truncates the file if the file exists.
  //O_WRONLY | O_APPEND will preserve the contents of the file if the file exists.
  hdfsFile writeFile = hdfsOpenFile(fs, rfile, O_WRONLY, 0, 0, 0);
  if (!writeFile) {
    fprintf(stderr, "Failed to open %s for writing!\n", rfile);
    exit(-2);
  }

  char* buffer = malloc(sizeof(char) * bufferSize);
  if(buffer == NULL) {
    fprintf(stderr, "Failed to allocate memory!\n");
    return -2;
  }

  int i;
  for (i=0; i<bufferSize; i++) {
    buffer[i] = 'a' + i%26;
  }

  //Write entire file from the beginning    
  int ret = writeLength(fs, writeFile, buffer, bufferSize, fileSize);
  if (ret < 0) {
    goto done;
  }

  //Write file at a particular offset 
  //In this case, we are writing from offset 0
  tSize writeBytes = writeAtOffset(fs, writeFile, 0, buffer, bufferSize);
  if (writeBytes < 0) {
    goto done;
  }

  //Write file at a particular offset using positional write
  //In this case, write from offset 20 
  writeBytes = positionalWrite(fs, writeFile, 20, buffer, bufferSize);
  if (writeBytes < 0) {
    goto done;
  }

  hdfsCloseFile(fs, writeFile);
done:
  free(buffer);
  hdfsDisconnect(fs);

  return 0;
}

/**
 * vim: ts=4: sw=4: et:
 */