Monday, 27 March 2017

Parallelization of R code using Azure Infrastructure


 

Working on large data sets, exploring which machine learning algorithm fits the bill is a daunting task. Moreover these ML algorithm can run into hours and days in certain cases. There is always a need of having compute resources available on the fly. R in principle is single threaded by nature.  To support parallel constructs like parallel for , apply functions we have the parallel package in R, which supports multi core and cluster based parallel execution.  The cluster supports both PSOCK and FORK implementation.

doAzureParallel R package is a lightweight R package built on top of Azure Batch Service (job scheduler service) that allows use of Azure compute resources from the R session. doAzureParallel supports the foreach parallel construct.

Getting started with doAzureParallel

Below video will walk you through on the basics of doAzureParallel.

 

 doAzureParallel does not have parallel constructs for apply function, If one did require to use them they can use the parallel package on the node on the cluster and get the best of parallel apply functions. With parallelism comes a degree of complexity of memory management and caching and understand how can FORK help for same. The below video explains  how to use parallel package and take the parallel execution of the code down to a core level.

Running Parallel constructs along with DoAzureParallel

 

Parallelization to MLR algorithms

DoAzureParallel in its current form supports foreach , it needs to graduate to support parallel apply functions. Taking the discussion to the next level it would be lovely if doAzureParallel would support mlr (classification, regression) set of algorithm to run in parallel.  The current set of algorithm like parallelmap, batchjobs and mlr solve the problem of running the Mlr algorithms . It’s pretty easy to see how a larger model, more iterations or a different choice of methods could result in unacceptably long run-times. One could use multi-core or socket level parallelism, but ideally taking advantage of as much computing resource is better choice,.

Apparently the batchjobs package doesnt support azure batch service.

ParallelMap is now directly integrated into mlr, and this makes scaling to parallel back-ends seamless. Our choice of back-end is parameterized so we can write algorithms once and choose the parallel back-end depending on the resources we have available when we run the model. To illustrate this, we re-run the same model, but instead of running the model on a single node, we run it on a clustered environment running OpenLava, an open-source Platform LSF compatible workload manager now supported by BatchJobs.

Below video explains how to use parallemap, mlr in a mult-core scenario along with doAzureParallel.

 

 

Demo codebase can be found here - https://github.com/ajayso/DoAzureParallel

 

Saturday, 4 March 2017

Azure Data Factory–DE glossed


 

 

Having worked on the Apache stack for sometime, I decided to look at Azure Big Data stack.  My starting point is data ingest.For most big data projects the journey starts out with data ingest, clean, transform and have it ready for analysis. Azure Data factory is MSFT Azure offering for cloud based data integration service that automates the movement and transformation of data. At a very basic level below is a representation of data lifecycle in big data projects

image

 

Azure Data Factory has the following constructs

- Linked Services have the define where the data has to be sourced from/to.

- Pipeline and Activities – Pipelines are a logical group of activities that performs the job of moving data from/ to.

- DataSets – Linked services interfaces the Data Factory to the external data sources. Datasets are a representation of the data store.

image

Linked Services provides for the interfaces to external sources, currently the support is limited to Azure, Databases, File based, Salesforce, OData a complete list can be found here.

From a customization stand point of view one can create custom activities. I have the linked service limited.  On the contrary Apache NiFi seems to have a better in multiple ways

- Intuitive UI - NiFi designer.Dataflows can become quite complex. Being able to visualize those flows and express them visually can help greatly to reduce that complexity and to identify areas that need to be simplified. NiFi enables not only the visual establishment of dataflows but it does so in real-time. Rather than being design and deploy it is much more like molding clay.

-  Better support for external sources linked services in Azure Data Factory a compared to processors in NiFi, have seen NiFi comes out better , list can be found here.

- NiFi is highly fault tolerent

- Superior Exception Handling – finer details here.

Saturday, 28 January 2017

Real-time Financial Stocks Analysis Architecture


 

 

In the prior 2 posts, the focus was more on using machine learning techniques like regression to predict gold buy / sell signals. While the models that we built ,give an idea on how to get to a final buy and sell signal for gold with the assumption data is clean and always available. Without relevant clean data, the model predictions would be of zero relevance to the business.

In every big data analysis project which heavily rely on real time data, a lot is dependent on the underlying software architecture which is responsible to deliver the data in an edible form for the models. In this post I have attempted to put together high  level architecture of a real time stock analysis platform. This is a high level architecture of XTrade platform is current in production for one of the customers.

 

Additionally I have a starters kit available at https://github.com/ajayso/XeusTrade.git which comprises of templates for all the components.

A Brief on XTrade - Day trading can be risky business, human analysis of real time data without intelligent insights can be detrimental. XTrade is real time stock technical analysis platform which ingest real time stock feed , industry data and news and analyse to provide predictions, correlations (weak and strong) called quants. XTrade interfaces with trading systems to execute the actions or provide these actionable insights to an average trader or analysts.

image

The architectures for most real-time system are in line with the lambda architecture. This post will focus on the speed (real-time processing) area of the lambda architecture.

image

 

Getting the Data In ……

Data comes in from multiple sources and can be varying formats and segregating relevant data needs a specialized software. XTrade had multiple data sources below are some of the more relevant ones.

  • Stock feeds
  • Industry feeds
  • News data
  • Other relevant data

The requirement is to pull data feeds from the data sources at a specified frequency( in minutes). Stale data management (dont pull stale data) and transformation to standard format in this case json is something which Apache NiFi provides for absolute ease. Apache NiFi is basic architectural building block for data ingest , transformation. NiFi has many processors with the options of writing your own processors in java.

Apache NiFi is an enterprise integration and dataflow automation tool that allows a user to send, receive, route, transform, and sort data, as needed, in an automated and configurable way. Similar tools exist, but NiFi is different because of its user-friendly drag-and-drop graphical user interface and the ease with which it can be customized on the fly for specific needs. Think of creating a simple flow chart of what you want to do with your data; that is how easy it is to create a dataflow in NIFi. It is also highly scalable and can run on something as simple as a laptop or clustered across many high-performance servers.

Below is example of XTrade Nifi data flows,

image

In the starter kit the NiFi folder has 2 templates which can be reused, these are data sources for individual stocks and news.

Implementation Details: The data flow will pull the feeds and process them, post which these need to put into a messaging systems, In this case we have used Apache Kafka. JSON is the standard data format used within this architecture.One has the option of persisting these data feeds to hdfs or any other persisted data store. Apache NiFI runs on a cluster and highly fault tolerant.

Messaging …..

The requirement of having low latency  reliable messaging system is really important. Apache Kafka -is used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant, wicked fast, and runs in production in thousands of companies.

Apache Kafka is been used as basic architecture building block for messaging in the architecture. Below is the high level representation of Kafka implementation for XTrade

image

Implementation Details:  Brokers are segregated based on the following

  • Individual Stocks: Since XTrade handles prediction and data insight for individual registered stocks from the customers, the decision to have separate broker for the same was taken to handle future scale out requirements.
  • Broker(Industry) messaging system for industry stock prices and news.
  • Broker(misc,) messaging exposure and risk management data coming in from customer systems and other public data sources.

Core Analysis and Data Decision Making…..

Fast processing of the data streams coming from the messaging layer can really help cut down latency of the overall lifecyle. Stream processing and calling analysis model in R, spark and send back the prediction and data insights in matter on minutes is key here. A platform which has the flexibility of supporting multiple programming languages was the need of the hour.

Apache Storm is a free and open source distributed real-time computation system. Storm makes it easy to reliably process unbounded streams of data, doing for real-time processing what Hadoop did for batch processing. Storm is simple, can be used with any programming language, and is a lot of fun to use!

Storm has many use cases: real-time analytics, online machine learning, continuous computation, distributed RPC, ETL, and more. Storm is fast: a benchmark clocked it at over a million tuples processed per second per node. It is scalable, fault-tolerant, guarantees your data will be processed, and is easy to set up and operate.

Implementation Details:  Stock and Industry data coming in from the Kafka is fetched by Storm with a Kafka Spout and further processes news and stock analysis calling R , Spark model which will emit the prediction back to Kafka topic (Broker Results), In the next post I will detail out he Spark, R model for stock technical analysis. The prediction are also written to Cassandra.

A ready skeletal code for Storm (eclipse project) written in java can be found here.

image

Storm architecture for XTrade

 

Putting it all together….

The entire solution can deployed to aws / azure ,  Managing the clusters across this distributed environment is a daunting task.

Apache Mesos looked to be a good option for the same. Apache Mesos is a centralised fault-tolerant cluster manager. It’s designed for distributed computing environments to provide resource isolation and management across a cluster of slave nodes. It schedules CPU and memory resources across the cluster in much the same way the Linux Kernel schedules local resources. Mesos support for Hadoop, NiFi, Kafka , Storm and Cassandra exists.