Sunday, 23 August 2015

HDFS Operations: Writing



Most Important functions provided by any file system is reading and writing. Obviously whats the point of having a storage house if you can't keep things and take them out.

Lets start with keeping things i.e Writing:

DataFlow:
  • Creating a new File
    1. Client creates a file, i.e makes an entry for a new file at its end. Like you decide that you need to put something in the storage house.
    2. Client makes a RPC (Remote Procedure Call) call to namenode to create a new file with no blocks associated with it.  Its like calling the manager of the storage house to give you permission to store something.
    3. The namenode performs checks to make sure the file doesn’t already exist, and that the client has the right permissions to create the file. If these checks pass, the namenode makes a record of the new file, i.e. the manager (or whoever does that) checks that you don't already have a space for the same item number, and that you are allowed to have a storage space (you can be a criminal... right????). If all checks are done and ok'ed, your package's entry is put into the file (lets say manager uses a entry file or paper book with receipts). Now manager has no role in your storing of the item, as he has other calls to attend too. He is just responsible for allocating space.
  • Writing Data to it
    1. As the client writes data, it is first broken down into packets (called blocks) of fixed size as configured (default is 64 MB) and added to data queue. Its like the package you want to put in storage house is collected from you and is divided into fixed size smaller packages and added to a queue for sending it to appropriate storage house unit.
    2. Now new blocks are allocated by the namenode on appropriate datanodes for the packets and their replicas. (As I told earlier hdfs keeps replicas of files for handling datanode failures.) Its like the manager checks out the space in each storage house unit and allocate the space for putting the packets. And gives the addresses for the units.
    3. For each packet: (Taking default replication factor 3)
      1.  First the packet is written to the first datanode. Then it is sent to the second datanode and then to third datanode.
      2. Then third datanode sends acknowledgement to second, which inturn sends it to first and then first one send back the acknowledgement, and write process for that packet finishes. 
  • Failure Handling: If a datanode fails in writing process
    1. All the packets waiting to be acknowledged are cancelled to be written again.
    2. Present packet is left as it is. Later when file system finds out that the block is "under replicated" (having less than configured number of copies), it creates its replicas on its own.
    3. The failing datanode is reported to the namenode. When the datanode recovers, the partially written block is deleted. Also the datanode is removed from the pipeline for all writes. 
    4. If the replicas created is less than dfs.replication.min, then the packet is written again.


API Version:
  • Creating a file:   
    1. The client creates the file by calling create() on DistributedFileSystem 
    2. DistributedFileSystem makes an RPC call to the namenode to create a new file in the filesystem’s namespace, with no blocks associated with it 
    3. The namenode performs various checks to make sure the file doesn’t already exist, and that the client has the right permissions to create the file. If these checks pass, the namenode makes a record of the new file; otherwise, file creation fails and the client is thrown an IOException.
    4. The DistributedFileSystem returns an FSDataOutputStream for the client to start writing data to. FSDataOutputStream wraps a DFSOutputStream, which handles communication with the datanodes and namenode. 
  • Writing Data to it:
    1. DFSOutputStream splits it into packets.
    2. Then DataStreamer asks the namenode to allocate new blocks by picking a list of suitable datanodes to store the replicas.
    3. For each packet:
      • The DataStreamer streams the packets to the first datanode in the pipeline, which stores the packet and forwards it to the second datanode in the pipeline and then the second datanode stores the packet and forwards it to the third.
      • DFSOutputStream maintains an acknowledgement queue. The third datanode sends acknowledgement to second, which in turn sends it to first and then first one send back the acknowledgement, and write process for that packet finishes.
    4. When the write process is done client calls close() on DistributedFileSystem.

Hope its all clear to you. Again.. If something is not clear then just comment and ask. 
One very important topic related to file writing in hdfs is "Replica Placement".
I will explain it in my next blog... Until then, stay tuned....  :)

Tuesday, 4 August 2015

Big Data: Hadoop Distributed File System

Welcome back.
Last time I talked about hadoop's components, HDFS and MapReduce. This time lets start with the basics of first component: HDFS.

Hadoop Distributed File System or HDFS, as the name suggests its a distributed file system. It is based on master slave architecture. It is composed of three components:
  1. Namenode: It's the master part of the file system. Only one node of the system can serve as a namenode. It not only manages the datanodes, but also keeps the index of the file system. Just like any other file system, this file system crashes if the index is unavailable and hence is the most important part of file system.
  2. Datanode: It's the slave part of the filesystem. One can add as many datanodes as they want (ideally, but just like any other system its not ideal). They all should be same in terms of hardware configurations to balance the system. (Note that namenode should be of superior hardware configuration but same will also do) They are the means to horizontally scale your system. Crashing of a datanode can be easily handled by the system.
  3. Secondary Namenode: It keeps a file system image of namenode and all the edit log. It periodically merges the filesystem image with the edit logs to prevent edit logs from becoming too large. So it is a copy of namenode's image merged with edit logs and can be used in the event of namenode failure. But it can't act as namenode. Also the image on secondary namenode is updated on intervals so the edits since last interval to failure will be lost, hence the data will be most certainly always lost if namenode goes down.

 HDFS has some base assumptions:

  1. Its a write once, read many times system: First and foremost thing to understand is that HDFS can keep all the data you need it to store but it can't change your data. To be more precise, once you keep a file in HDFS, it can't be modified because that will be a very costly operation.
  2. Datanodes are bound to fail: As HDFS uses commodity hardware, it assumes that datanodes will fail. 
    1. To handle the failure HDFS keeps multiple replicas of same file on different data nodes. The number of replicas is configurable. It depends on a number of factors, but most simply it is a trade off of how much space do you have to waste on replicas and how much available you want that file to be. (Please NOTE that more is the number of replicas of a file more is the chance that you will find one of the replicas on the same node or neighboring nodes where you are working).
    2. To detect it, datanodes send heart beat signals to namenode after every few seconds (configurable, default is 3). If namenode doesn't get these signals for some interval (again configurable), then it believes that datanode to be dead (crashed) and start taking appropriate measures (Will explain the measures in a later post).
  3. Small number of large files: Its better to have small number of large files in HDFS rather than large number of small files. Namenode acts as index for all the files in HDFS. Index for large number of files will take up more space and fill namenode memory rapidly. There are workarounds in HDFS for this situation but its still better to avoid it.
  4. File corruption can be detected not corrected: HDFS can detect corrupted files but can't correct them. 
    1. To detect corruption: HDFS uses cyclic redundancy check to determine whether a file is correct or corrupted. HDFS keeps a .crc file with the original file for this check. This .crc file is really small and doesn't take up a lot of space. Its storage overhead is less than 1%
    2. To handle corruption: HDFS simply put that file in corrupted file list, so that it doesn't divert any client to it. Then HDFS simply create one more replica of the file using other uncorrupted, healthy replica.
  5. Hardware configurations and job requirements are known well: HDFS (Infact whole hadoop framework) has a lot of configuration parameters. Every configuration parameter has different impact on the system as a whole. Every configuration has a tradeoff (a situation in which you must choose between or balance two things that are opposite or cannot be had at the same time, dictionary definition) for e.g. 
    1. There is a configuration parameter to indicate whether files has to be compressed or not. Tradeoff is between space and processing. If you compress your files they will take up less space but more processing, as you have to decompress them to read them. 
    2. There is a configuration parameter for setting heartbeat signal interval. If you set it too less then your network is used too much. If you set it too high, your namenode might miss that a datanode has crashed and may divert some clients to the datanode which is not available.
This was the basic idea of HDFS. In coming posts, I'll explain different algorithms used by HDFS, failures and recovery methods in HDFS and Read and Write process of HDFS etc.

Stay tuned.. Keep learning... Meanwhile please let me know whether you like it or not... :)

Thursday, 30 July 2015

Big Data: Hadoop Distributed Framework

Lets move to various technologies that come under big data.

I am starting with hadoop, one of the most used (and criticized, not sure why) distributed frameworks for big data.
(FYI, As I am starting with the basics I am explaining hadoop 0.x and 1.x,  NOT 2.x or YARN)

Distributed Frameworks are building blocks of distributed systems and define their behavior in terms of various important aspects like architecture, synchronization algorithms, load balancing, fault tolerance , failure recovery algorithms, programming paradigms and a lot more...
 
Hadoop is composed of two components:
  1. HDFS (Hadoop Distributed File System) : Its a distributed file system based on GFS (Google File System).
  2. Map Reduce: Programming paradigm (way of programming). It has got two phases map and reduce, hence the name.

Hadoop as a distributed framework:

Architecture:
Hadoop has master slave architecture i.e. one node (I hope you understand what node is) acts as a master or a manager (an efficient one, not the one everyone's bitching about, just kidding) and all other nodes act as slave or worker.
  1.  HDFS: For HDFS, namenode is master and datanode is slave. Datanode keeps the data, and namenode acts as index for the data.
  2. Map Reduce: For map reduce, master is jobtracker and slave is tasktracker. Tasktracker runs the processes for map phase and reduce phase. Jobtracker acts as the manger and decides where to run which process.


Synchronization Algorithms:
As hadoop is a master slave architecture, obviously synchronization is done by master. Slaves send messages to master after some fixed interval. These messages are called heart beat messages, as they tell master that a slave is alive. These messages also contain other information like process completion (on tasktracker), space remaining (on datanode) etc.


Fault Tolerance and failure recovery:
As these nodes are commodity hardware, they are bound to fail.
  1. Slave's data is replicated (copied) on multiple nodes to make the cluster fault tolerant. Whenever any one of them fails, master just redirects the reads to other node's copies. 
  2. Master's data (index of the files on slaves) is copied on another node. On failure the copied data on the other node has to be copied back on master and has to be restarted by admin. (FYI this problem has been solved in hadoop 2.x)

Programming paradigm:
As hadoop needs to run the programs as distributed processes, it has a different way of programming. It is called map reduce. Everything is seen as a key value pair.
Map is used to create the right key value pair and reduce, as the name suggests aggregates the data. (Again FYI, this map is not the data structure map, its a phase)
e.g. bloggers log has to be examined to find the referring urls and number of references per url. Map phase will create each referring url as key and one(1) as value and reduce phase will add all the occurrences (or 1s) of each key or referring url
Map: <URL1,1>;<URL1,1>;<URL2,1>
Reduce:<URL1,1+1>;<URL2,1>


This is just the basic idea of hadoop. Each process and component will take 2-3 whole posts.

Ask questions if you don't understand something, you can even disagree with me. Looking forward to some response....
Stay tuned for more elaborate explanations and examples...

Wednesday, 22 July 2015

Why big data - II

Lets start where we left last time.

Lets elaborate how big data solves the following problems:
1. The problem of storing data as it increases: Instead of one server or computer, big Data systems are composed of multiple computers (called as nodes). They are simple commodity hardware (in less technical terms, PCs) and hence are cheaper and easy to scale. You don't need to change the configuration of your systems or servers if you work with big data, you just have to add one more system.
For e.g. Lets suppose you want to write a novel and you start writing it in a cheap notebook. But eventually notebook is not sufficient for your whole story. There are two ways to solve the problem:
  1. Add more pages to the notebook, and when that is not possible just buy a new bigger, thicker and expensive notebook. Then copy everything from original notebook to the new notebook, and then resume your novel writing. This is called vertical scaling. Pretty silly way, but that's what the traditional approach with the servers.
  2.  Buy a new cheap notebook, just resume your writing. Go on adding as many notebooks as you want, and just create an index. Index will tell which notebook has which part of novel or whats the sequence of the notebooks. That is called horizontal scaling. Pretty obvious and efficient way and is used by big data systems.
 

2. The problem of handling requests:As big data systems are composed of multiple computers (called as nodes), the handlers for requests increase. Now your single system is not your bottleneck. Big data synchronizer just sends your requests to the nearest node which is free or the best case scenario nearest node which is free and has the data that the request is to access.
Lets suppose a person grows different kinds of vegetables, and has different people working on different vegetables.
  1. If he has one farm with one entry and one exit, there is a queue of workers waiting to enter the farm. Besides, to reach a particular vegetable section, workers has to go through all the sections in between. These workers are requests, data access requests to be more precise and those sections of different vegetables can be different databases or directories. And hence that way traditional systems cause long delays
  2. If he has multiple smaller farms, lets say one for each type of vegetable (best case scenario). There are multiple entries and exists, and each worker can simply enter and start working, no delays of going through other sections to get the desired section.


3. The problem of distributing computation: Big data systems not just distribute the requests, but also distributes the data. They not only distribute directory or database, they distribute the single units of computation like file or table. So, even if your file is too large to fit in  the memory of one system, it doesn't matter because now it is not in one system but is split and distributed in different systems with indexes. So if you want to access only a part of your file or your table, then you don't need to read the whole file and seek the point. Also, if you want to do some manipulation, your execution is working on different parts of your file and hence it is being worked on in not only distributed but also in parallel manner.
Lets say there is a log of all the events happening on amazon for one day. Now you want to find out the average sales of each product per hour.
  1. Traditional systems will read the file in chunks (of the size which can fit into memory at a time), then will do grouping and aggregations on each chunk sequentially. This results in lot of I/O delays and processing delays, and hence is really time consuming.
  2. Big data systems will just split the log file into some fixed size chunks, and will give each chunk to different nodes. Now the groupings and aggregation is done on different nodes in parallel. Now the log chunks can fit into memory, so no I/O delays, and the process is not sequential, hence much lesser processing delays.

Hope this time you got the whole concept, not just the gist. If you are not convinced, just comment and ask. I'll be more than happy to help.
Also please provide your valuable suggestions as well.

Stay Tuned for more...




Sunday, 12 July 2015

Why Big data?

After my last post if you decide that you actually need big data, then lets start the deep dive level I.

When do you need big data or when does your traditional systems fail:

As the data size increases, the problem of storing it arises. This is usually solved by increasing your memory size. Then comes the problem of computation on such huge data, you increase your RAM. When that stops working, you switch to servers with loads of memory and RAM.

Then starts a problem of handling requests. As the size of organization or website popularity increases, number of users increase and ultimately number of requests for data access increase. But your server no matter how powerful it is, is after all a single device and  there's a limit on how much it can handle at once. It then becomes a bottleneck (the most used and boring term in the books). Its as if hundreds of cars are trying to enter a city which is big enough to accommodate the cars and its people (no problem there), but the highway to enter the city is jam packed.

Now you need a system that distributes the requests and data. Here comes your distributed RDBMS, you still have the benefits of RDBMS and you have distributed you databases across different systems, your requests will now be distributed to different systems which will process your request and will provide you with data you want. Its like you create different entrance routes to different parts of the city, all the traffic gets distributed between these entrances on the basis of the part of city a person wants to go to. But it distributes the requests, not computation.

Now starts the ultimate problem, distributing your computation or data manipulation. Lets say you have a table of 100 GB of data, and you have to do a computation on it, e.g. you want do a sum along with group by on it.While distributed RDBMS distributes the databases, it doesn't distribute your table. As your table is on a single machine, your computation, no matter how big is also on a single machine. Your system goes madly slow and you ofcourse go mad. So,none of the above solutions can help you. And here comes the ultimate rescuer Big data. Say Hellooo people :)



Why Big Data:

Here's how big data will solve the problem. It will break your table into multiple splits to be put on multiple systems of your cluster (nodes of the cluster to be precise) and the computation will be done on multiple nodes instead of one. Now you have data at multiple nodes, and each part is being processed by one node. Now, not only your data, or your requests, but also your computation is distributed.
Result: Neither you nor your system goes mad, as its not one system at all, so no limitation or bottleneck

Its ok if you don't fully understand it until now, its enough to get the gist of it. Stay Tuned for more...

Wednesday, 8 July 2015

Big Data: Do you need it

Big data is one of the most misunderstood terms now a days. Every article and book on big data starts with how much data has become available. Facts and figures are given on how dramatically data volume has increased over the years with internet boom. And all this is what created the need for big data technologies. Its not fully wrong but its misdirecting, and is misinterpreted big time.

It is often interpreted that need for big data arose as a result of the need to store that much volume of data. But actually it wasn’t just the storage, but the computation that was biggest problem. Need for big data is defined by the amount of data you need to process at a time.

Lets start with an example, there is a e-commerce company, having lots of data about everything starting from products, their prices, categories etc. to user profiles, user logs. They are using a RDBMS for storing their product and user data, and simple log files for user logs.
They have loads of data, probably a few hundred GBs, for products and user data. Now do they need big data for this? The answer surprisingly is no. Data is in hundreds of GBs but they don’t need to process this data all at once. At most they probably need to join 4-5 tables(or less if they have efficient schema). Big data will harm them more than helping them, they can simply use distributed RDBMS(no synchronization needed).
On the other hand if they decide to analyze their user logs, find out patterns to decide marketing strategies. Thats a real choice, whether to use big data or not, based on the volume of logs per day and how far in the history they want to go. If they are popular and have GBs of data per day, and:
  1. want to analyze data for just one day: just load the relevant part to RDBMS every hour or so 
  2. want to analyze data for a month: probably won’t work with RDBMS in their case (will still work if logs are small, just 2-3 GBs)
  3. want to analyze data for one or more years: Its better to switch to big data tools, too cumbersome to handle computations on such huge data by one system.

Thus the biggest question before starting the switch to big data every one need to ask is whether it is actually needed. If you answer right it might save a whole lot of trouble.