Sunday 23 August 2015

HDFS Operations: Writing



Most Important functions provided by any file system is reading and writing. Obviously whats the point of having a storage house if you can't keep things and take them out.

Lets start with keeping things i.e Writing:

DataFlow:
  • Creating a new File
    1. Client creates a file, i.e makes an entry for a new file at its end. Like you decide that you need to put something in the storage house.
    2. Client makes a RPC (Remote Procedure Call) call to namenode to create a new file with no blocks associated with it.  Its like calling the manager of the storage house to give you permission to store something.
    3. The namenode performs checks to make sure the file doesn’t already exist, and that the client has the right permissions to create the file. If these checks pass, the namenode makes a record of the new file, i.e. the manager (or whoever does that) checks that you don't already have a space for the same item number, and that you are allowed to have a storage space (you can be a criminal... right????). If all checks are done and ok'ed, your package's entry is put into the file (lets say manager uses a entry file or paper book with receipts). Now manager has no role in your storing of the item, as he has other calls to attend too. He is just responsible for allocating space.
  • Writing Data to it
    1. As the client writes data, it is first broken down into packets (called blocks) of fixed size as configured (default is 64 MB) and added to data queue. Its like the package you want to put in storage house is collected from you and is divided into fixed size smaller packages and added to a queue for sending it to appropriate storage house unit.
    2. Now new blocks are allocated by the namenode on appropriate datanodes for the packets and their replicas. (As I told earlier hdfs keeps replicas of files for handling datanode failures.) Its like the manager checks out the space in each storage house unit and allocate the space for putting the packets. And gives the addresses for the units.
    3. For each packet: (Taking default replication factor 3)
      1.  First the packet is written to the first datanode. Then it is sent to the second datanode and then to third datanode.
      2. Then third datanode sends acknowledgement to second, which inturn sends it to first and then first one send back the acknowledgement, and write process for that packet finishes. 
  • Failure Handling: If a datanode fails in writing process
    1. All the packets waiting to be acknowledged are cancelled to be written again.
    2. Present packet is left as it is. Later when file system finds out that the block is "under replicated" (having less than configured number of copies), it creates its replicas on its own.
    3. The failing datanode is reported to the namenode. When the datanode recovers, the partially written block is deleted. Also the datanode is removed from the pipeline for all writes. 
    4. If the replicas created is less than dfs.replication.min, then the packet is written again.


API Version:
  • Creating a file:   
    1. The client creates the file by calling create() on DistributedFileSystem 
    2. DistributedFileSystem makes an RPC call to the namenode to create a new file in the filesystem’s namespace, with no blocks associated with it 
    3. The namenode performs various checks to make sure the file doesn’t already exist, and that the client has the right permissions to create the file. If these checks pass, the namenode makes a record of the new file; otherwise, file creation fails and the client is thrown an IOException.
    4. The DistributedFileSystem returns an FSDataOutputStream for the client to start writing data to. FSDataOutputStream wraps a DFSOutputStream, which handles communication with the datanodes and namenode. 
  • Writing Data to it:
    1. DFSOutputStream splits it into packets.
    2. Then DataStreamer asks the namenode to allocate new blocks by picking a list of suitable datanodes to store the replicas.
    3. For each packet:
      • The DataStreamer streams the packets to the first datanode in the pipeline, which stores the packet and forwards it to the second datanode in the pipeline and then the second datanode stores the packet and forwards it to the third.
      • DFSOutputStream maintains an acknowledgement queue. The third datanode sends acknowledgement to second, which in turn sends it to first and then first one send back the acknowledgement, and write process for that packet finishes.
    4. When the write process is done client calls close() on DistributedFileSystem.

Hope its all clear to you. Again.. If something is not clear then just comment and ask. 
One very important topic related to file writing in hdfs is "Replica Placement".
I will explain it in my next blog... Until then, stay tuned....  :)

Tuesday 4 August 2015

Big Data: Hadoop Distributed File System

Welcome back.
Last time I talked about hadoop's components, HDFS and MapReduce. This time lets start with the basics of first component: HDFS.

Hadoop Distributed File System or HDFS, as the name suggests its a distributed file system. It is based on master slave architecture. It is composed of three components:
  1. Namenode: It's the master part of the file system. Only one node of the system can serve as a namenode. It not only manages the datanodes, but also keeps the index of the file system. Just like any other file system, this file system crashes if the index is unavailable and hence is the most important part of file system.
  2. Datanode: It's the slave part of the filesystem. One can add as many datanodes as they want (ideally, but just like any other system its not ideal). They all should be same in terms of hardware configurations to balance the system. (Note that namenode should be of superior hardware configuration but same will also do) They are the means to horizontally scale your system. Crashing of a datanode can be easily handled by the system.
  3. Secondary Namenode: It keeps a file system image of namenode and all the edit log. It periodically merges the filesystem image with the edit logs to prevent edit logs from becoming too large. So it is a copy of namenode's image merged with edit logs and can be used in the event of namenode failure. But it can't act as namenode. Also the image on secondary namenode is updated on intervals so the edits since last interval to failure will be lost, hence the data will be most certainly always lost if namenode goes down.

 HDFS has some base assumptions:

  1. Its a write once, read many times system: First and foremost thing to understand is that HDFS can keep all the data you need it to store but it can't change your data. To be more precise, once you keep a file in HDFS, it can't be modified because that will be a very costly operation.
  2. Datanodes are bound to fail: As HDFS uses commodity hardware, it assumes that datanodes will fail. 
    1. To handle the failure HDFS keeps multiple replicas of same file on different data nodes. The number of replicas is configurable. It depends on a number of factors, but most simply it is a trade off of how much space do you have to waste on replicas and how much available you want that file to be. (Please NOTE that more is the number of replicas of a file more is the chance that you will find one of the replicas on the same node or neighboring nodes where you are working).
    2. To detect it, datanodes send heart beat signals to namenode after every few seconds (configurable, default is 3). If namenode doesn't get these signals for some interval (again configurable), then it believes that datanode to be dead (crashed) and start taking appropriate measures (Will explain the measures in a later post).
  3. Small number of large files: Its better to have small number of large files in HDFS rather than large number of small files. Namenode acts as index for all the files in HDFS. Index for large number of files will take up more space and fill namenode memory rapidly. There are workarounds in HDFS for this situation but its still better to avoid it.
  4. File corruption can be detected not corrected: HDFS can detect corrupted files but can't correct them. 
    1. To detect corruption: HDFS uses cyclic redundancy check to determine whether a file is correct or corrupted. HDFS keeps a .crc file with the original file for this check. This .crc file is really small and doesn't take up a lot of space. Its storage overhead is less than 1%
    2. To handle corruption: HDFS simply put that file in corrupted file list, so that it doesn't divert any client to it. Then HDFS simply create one more replica of the file using other uncorrupted, healthy replica.
  5. Hardware configurations and job requirements are known well: HDFS (Infact whole hadoop framework) has a lot of configuration parameters. Every configuration parameter has different impact on the system as a whole. Every configuration has a tradeoff (a situation in which you must choose between or balance two things that are opposite or cannot be had at the same time, dictionary definition) for e.g. 
    1. There is a configuration parameter to indicate whether files has to be compressed or not. Tradeoff is between space and processing. If you compress your files they will take up less space but more processing, as you have to decompress them to read them. 
    2. There is a configuration parameter for setting heartbeat signal interval. If you set it too less then your network is used too much. If you set it too high, your namenode might miss that a datanode has crashed and may divert some clients to the datanode which is not available.
This was the basic idea of HDFS. In coming posts, I'll explain different algorithms used by HDFS, failures and recovery methods in HDFS and Read and Write process of HDFS etc.

Stay tuned.. Keep learning... Meanwhile please let me know whether you like it or not... :)