Thursday, 9 October 2014

3 V's of Big Data

As a catch-all term, “big data” can be pretty nebulous, in the same way that the term “cloud” covers diverse technologies. Input data to big data systems could be chatter from social networks, web server logs, traffic flow sensors, satellite imagery, broadcast audio streams, banking transactions, MP3s of rock music, the content of web pages, scans of government documents, GPS trails, telemetry from automobiles, financial market data, the list goes on. Are these all really the same thing?
To clarify matters, the three Vs of volume, velocity and variety are commonly used to characterize different aspects of big data. They’re a helpful lens through which to view and understand the nature of the data and the software platforms available to exploit them. Most probably you will contend with each of the Vs to one degree or another.
Volume
The benefit gained from the ability to process large amounts of information is the main attraction of big data analytics. Having more data beats out having better models: simple bits of math can be unreasonably effective given large amounts of data. If you could run that forecast taking into account 300 factors rather than 6, could you predict demand better?
This volume presents the most immediate challenge to conventional IT structures. It calls for scalable storage, and a distributed approach to querying. Many companies already have large amounts of archived data, perhaps in the form of logs, but not the capacity to process it.
Assuming that the volumes of data are larger than those conventional relational database infrastructures can cope with, processing options break down broadly into a choice between massively parallel processing architectures — data warehouses or databases such as Greenplum — and Apache Hadoop-based solutions. This choice is often informed by the degree to which the one of the other “Vs” — variety — comes into play. Typically, data warehousing approaches involve predetermined schemas, suiting a regular and slowly evolving dataset. Apache Hadoop, on the other hand, places no conditions on the structure of the data it can process.
At its core, Hadoop is a platform for distributing computing problems across a number of servers. First developed and released as open source by Yahoo, it implements the MapReduce approach pioneered by Google in compiling its search indexes. Hadoop’s MapReduce involves distributing a dataset among multiple servers and operating on the data: the “map” stage. The partial results are then recombined: the “reduce” stage.
To store data, Hadoop utilizes its own distributed filesystem, HDFS, which makes data available to multiple computing nodes. A typical Hadoop usage pattern involves three stages:
  • loading data into HDFS,
  • MapReduce operations, and
  • retrieving results from HDFS.
This process is by nature a batch operation, suited for analytical or non-interactive computing tasks. Because of this, Hadoop is not itself a database or data warehouse solution, but can act as an analytical adjunct to one.
One of the most well-known Hadoop users is Facebook, whose model follows this pattern. A MySQL database stores the core data. This is then reflected into Hadoop, where computations occur, such as creating recommendations for you based on your friends’ interests. Facebook then transfers the results back into MySQL, for use in pages served to users.
Velocity
The importance of data’s velocity — the increasing rate at which data flows into an organization — has followed a similar pattern to that of volume. Problems previously restricted to segments of industry are now presenting themselves in a much broader setting. Specialized companies such as financial traders have long turned systems that cope with fast moving data to their advantage. Now it’s our turn.
Why is that so? The Internet and mobile era means that the way we deliver and consume products and services is increasingly instrumented, generating a data flow back to the provider. Online retailers are able to compile large histories of customers’ every click and interaction: not just the final sales. Those who are able to quickly utilize that information, by recommending additional purchases, for instance, gain competitive advantage. The smartphone era increases again the rate of data inflow, as consumers carry with them a streaming source of geolocated imagery and audio data.
It’s not just the velocity of the incoming data that’s the issue: it’s possible to stream fast-moving data into bulk storage for later batch processing, for example. The importance lies in the speed of the feedback loop, taking data from input through to decision. A commercial from IBM makes the point that you wouldn’t cross the road if all you had was a five-minute old snapshot of traffic location. There are times when you simply won’t be able to wait for a report to run or a Hadoop job to complete.
Industry terminology for such fast-moving data tends to be either “streaming data,” or “complex event processing.” This latter term was more established in product categories before streaming processing data gained more widespread relevance, and seems likely to diminish in favor of streaming.
There are two main reasons to consider streaming processing. The first is when the input data are too fast to store in their entirety: in order to keep storage requirements practical some level of analysis must occur as the data streams in. At the extreme end of the scale, the Large Hadron Collider at CERN generates so much data that scientists must discard the overwhelming majority of it — hoping hard they’ve not thrown away anything useful. The second reason to consider streaming is where the application mandates immediate response to the data. Thanks to the rise of mobile applications and online gaming this is an increasingly common situation.
Product categories for handling streaming data divide into established proprietary products such as IBM’s InfoSphere Streams, and the less-polished and still emergent open source frameworks originating in the web industry: Twitter’s Storm, and Yahoo S4.
As mentioned above, it’s not just about input data. The velocity of a system’s outputs can matter too. The tighter the feedback loop, the greater the competitive advantage. The results might go directly into a product, such as Facebook’s recommendations, or into dashboards used to drive decision-making.
It’s this need for speed, particularly on the web, that has driven the development of key-value stores and columnar databases, optimized for the fast retrieval of precomputed information. These databases form part of an umbrella category known as NoSQL, used when relational models aren’t the right fit.

Variety

Rarely does data present itself in a form perfectly ordered and ready for processing. A common theme in big data systems is that the source data is diverse, and doesn’t fall into neat relational structures. It could be text from social networks, image data, a raw feed directly from a sensor source. None of these things come ready for integration into an application.
Even on the web, where computer-to-computer communication ought to bring some guarantees, the reality of data is messy. Different browsers send different data, users withhold information, they may be using differing software versions or vendors to communicate with you. And you can bet that if part of the process involves a human, there will be error and inconsistency.
A common use of big data processing is to take unstructured data and extract ordered meaning, for consumption either by humans or as a structured input to an application. One such example is entity resolution, the process of determining exactly what a name refers to. Is this city London, England, or London, Texas? By the time your business logic gets to it, you don’t want to be guessing.
The process of moving from source data to processed application data involves the loss of information. When you tidy up, you end up throwing stuff away. This underlines a principle of big data: when you can, keep everything. There may well be useful signals in the bits you throw away. If you lose the source data, there’s no going back.
Despite the popularity and well understood nature of relational databases, it is not the case that they should always be the destination for data, even when tidied up. Certain data types suit certain classes of database better. For instance, documents encoded as XML are most versatile when stored in a dedicated XML store such as MarkLogic. Social network relations are graphs by nature, and graph databases such as Neo4J make operations on them simpler and more efficient.
Even where there’s not a radical data type mismatch, a disadvantage of the relational database is the static nature of its schemas. In an agile, exploratory environment, the results of computations will evolve with the detection and extraction of more signals. Semi-structured NoSQL databases meet this need for flexibility: they provide enough structure to organize data, but do not require the exact schema of the data before storing it.

Tuesday, 7 October 2014

Big Data

What is big data?

Big data is data that exceeds the processing capacity of conventional database systems. The data is too big, moves too fast, or doesn’t fit the strictures of your database architectures. To gain value from this data, you must choose an alternative way to process it.
The hot IT buzzword of 2012, big data has become viable as cost-effective approaches have emerged to tame the volume, velocity and variability of massive data. Within this data lie valuable patterns and information, previously hidden because of the amount of work required to extract them. To leading corporations, such as Walmart or Google, this power has been in reach for some time, but at fantastic cost. Today’s commodity hardware, cloud architectures and open source software bring big data processing into the reach of the less well-resourced. Big data processing is eminently feasible for even the small garage startups, who can cheaply rent server time in the cloud.
The value of big data to an organization falls into two categories: analytical use, and enabling new products. Big data analytics can reveal insights hidden previously by data too costly to process, such as peer influence among customers, revealed by analyzing shoppers’ transactions, social and geographical data. Being able to process every item of data in reasonable time removes the troublesome need for sampling and promotes an investigative approach to data, in contrast to the somewhat static nature of running predetermined reports.
The past decade’s successful web startups are prime examples of big data used as an enabler of new products and services. For example, by combining a large number of signals from a user’s actions and those of their friends, Facebook has been able to craft a highly personalized user experience and create a new kind of advertising business. It’s no coincidence that the lion’s share of ideas and tools underpinning big data have emerged from Google, Yahoo, Amazon and Facebook.
The emergence of big data into the enterprise brings with it a necessary counterpart: agility. Successfully exploiting the value in big data requires experimentation and exploration. Whether creating new products or looking for ways to gain competitive advantage, the job calls for curiosity and an entrepreneurial outlook.

Sunday, 5 October 2014

Hadoop - A Map/Reduce Implementation

Hadoop is a framework for managing large data processing, analysing and getting useful results out of that.

1.1. The Magic of HDFS

The idea underpinning map/reduce--bringing compute to the data instead of the opposite--should sound like a very simple solution to the I/O bottleneck inherent in traditional parallelism. However, the devil is in the details, and implementing a framework where a single large file is transparently diced up and distributed across multiple physical computing elements (all while appearing to remain a single file to the user) is not trivial.
Hadoop, perhaps the most widely used map/reduce framework, accomplishes this feat using HDFS, the Hadoop Distributed File System. HDFS is fundamental to Hadoop because it provides the data chunking and distribution across compute elements necessary for map/reduce applications to be efficient. Since we're now talking about an actual map/reduce implementation and not an abstract concept, let's refer to the abstract compute elements now as compute nodes.
HDFS exists as a filesystem into which you can copy files to and from in a manner not unlike any other filesystem. Many of the typical commands for manipulating files (lsmkdirrmmvcpcattail, and chmod, to name a few) behave as you might expect in any other standard filesystem (e.g., Linux's ext4).
The magical part of HDFS is what is going on just underneath the surface. Although it appears to be a filesystem that contains files like any other, in reality those files are distributed across multiple physical compute nodes:
Schematic depicting the magic of HDFS
When you copy a file into HDFS as depicted above, that file is transparently sliced into 64 MB "chunks" and replicated three times for reliability. Each of these chunks are distributed to various compute nodes in the Hadoop cluster so that a given 64 MB chunk exists on three independent nodes. Although physically chunked up and distributed in triplicate, all of your interactions with the file on HDFS still make it appear as the same single file you copied into HDFS initially. Thus, HDFS handles all of the burden of slicing, distributing, and recombining your data for you.
HDFS's chunk size and replication
The 64 MB chunk (block) size and the choice to replicate your data three times are only HDFS's default values. These decisions can be changed:
  • the 64 MB block size can be modified by changing the dfs.block.size property in hdfs-site.xml. It is common to increase this to 128 MB in production environments.
  • the replication factor can be modified by changing the dfs.replication property in hdfs-site.xml. It can also be changed on a per-file basis by specifying -D dfs.replication=1 on your -put command line, or using thehadoop dfs -setrep -w 1 command.

1.2. Map/Reduce Jobs

HDFS is an interesting technology in that it provides data distribution, replication, and automatic recovery in a user-space filesystem that is relatively easy to configure and, conceptually, easy to understand. However, its true utility comes to light when map/reduce jobs are executed on data stored in HDFS.
As the name implies, map/reduce jobs are principally comprised of two steps: the map step and the reduce step. The overall workflow generally looks something like this:
Program flow of a map/reduce application
The left half of the diagram depicts the HDFS magic described in the previous section, where the hadoop dfs -copyFromLocal command is used to move a large data file into HDFS and it is automatically replicated and distributed across multiple physical compute nodes. While this step of moving data into HDFS is not strictly a part of a map/reduce job (i.e., your dataset may already have a permanent home on HDFS just like it would any other filesystem), a map/reduce job's input data must already exist on HDFS before the job can be started.

1.2.1. The Map Step

Once a map/reduce job is initiated, the map step
  1. Launches a number of parallel mappers across the compute nodes that contain chunks of your input data
  2. For each chunk, a mapper then "splits" the data into individual lines of text on newline characters (\n)
  3. Each split (line of text that was terminated by \n) is given to your mapper function
  4. Your mapper function is expected to turn each line into zero or more key-value pairs and then "emit" these key-value pairs for the subsequent reduce step
That is, the map step's job is to transform your raw input data into a series of key-value pairs with the expectation that these parsed key-value pairs can be analyzed meaningfully by the reduce step. It's perfectly fine for duplicate keys to be emitted by mappers.
Input splitting
The decision to split your input data along newline characters is just the default behavior, which assumes your input data is just an ascii text file. You can change how input data is split before being passed to your mapper function using alternate InputFormats.

1.2.2. The Reduce Step

Once all of the mappers have finished digesting the input data and have emitted all of their key-value pairs, those key-value pairs are sorted according to their keys and then passed on to the reducers. The reducers are given key-value pairs in such a way that all key-value pairs sharing the same key always go to the same reducer. The corollary is then that if one particular reducer has one specific key, it is guaranteed to have all other key-value pairs sharing that same key, and all those common keys will be in a continuous strip of key-value pairs that reducer received.
Your job's reducer function then does some sort of calculation based on all of the values that share a common key. For example, the reducer might calculate the sum of all values for each key (e.g., the word count example). The reducers then emit key-value pairs back to HDFS where each key is unique, and each of these unique keys' values are the result of the reducer function's calculation.
The Sort and Shuffle
The process of sorting and distributing the mapper's output to the reducers can be seen as a separate step often called the "shuffle". What really happens is that as mappers emit key-value pairs, the keys are passed through thePartitioner to determine which reducer they are sent to.
The default Partitioner is a function which hashes the key and then takes the modulus of this hash and the number of reducers to determine which reducer gets that key-value pair. Since the hash of a given key will always be the same, all key-value pairs sharing the same key will get the same output value from the Partitioner and therefore wind up on the same reducer.
Once all key-value pairs are assigned to their reducers, the reducers all sort their keys so that a single loop over all of a reducer's keys will examine all the values of a single key before moving on to the next key. As you will see in my tutorial on writing mappers and reducers in Python, this is an essential feature of the Hadoop streaming interface.