CPA Ads Performance Prediction Using Presto, Hazelcast & HBase

michael by Michael Kupferman │ Intango’s VP of R&D

Intango’s Magic Revealed (some of it).

Warning: This post is super geeky and technology oriented

A short preference about Intango

Intango (http://www.intango.com) is technology company (Established 2008) that offers an industry-leading optimization platform for online advertising campaigns and monetization tools for publishers.

Intango manages its own publisher community as well as demand partners (advertisers of many kinds).

Intango’s platform also conducts Real-Time-Bidding (RTB), leveraging several protocols and industry standards such as RTB 2.3 in several verticals such as Display and Video.

Overview

For each and every one of billions of daily opportunities to display an ad to a user (‘impressions’), the best performing ad should be selected out of hundreds of thousands of ads in the inventory.

Moreover, as the same opportunity goes out to an auction (RTB), an external CPV (‘Cost Per View’) bid, should be compared with such best internal ad.

Each impression has a ‘footprint’ which is referred to as ‘the impression key’.

The footprint is a combination of attributes defining the specific occasion. The key can contain values such as:

  • Country of the user (US, UK etc.), also referred to as ‘GEO
  • Browser (Chrome 5.1.0, Firefox 34 etc.)
  • Device (Computer, Tablet etc.)
  • Operating System (Mac, Windows etc.), also referred to as ‘OS
  • Connection type (Mobile Carrier, Cable etc.)
  • ISP or Carrier
  • Time of Day

And so forth.

On top of those (some per user visit) parameters, there are extra attributes that might further distinguish this opportunity, such as: the web site vertical, the keywords/context of the visit, some information about the user, capping etc.

Filtering:

Based on such a wide set of parameters, a quick process of ad-filtering should be conducted. As some advertisers are not targeting users which the key attributions described.

Sorting:

The remaining ads, should be sorted based on their past performance under such impression key attributions.

In order to do so, the system must define a rank that represents the CPM value (Average income per 1,000 impressions) of the individual ad for this unique case.

The system might also assign a weight between 0 and 1 to this predictive rank, this might represent considerations such as the level of certainty, the importance for this impression (first time for user etc).

The rank:

This rank assigned to an ad is actually a predictive value, representing the probability of the display of an advertisement item to convert to income.

This ‘rank’ value holds major importance, as it holds the risk of not serving the alternatives, and in worse case, can be the difference between making zero money or charging full cost from a second or third bidder.

The complication of calculating such a predictive rank to any given permutation in the system is commonly referred to as ‘The ad serving algorithm’.

Such algorithms (‘The Algo’) require a robust and scalable historical statistic data storage (‘Big-Data’) with strong querying capabilities.

The Algo requires heavy batch calculation processes as well as real-time calculations and decision making.

The entire process, for a 100,000 requests per minute traffic rate, should not take more than 10 milliseconds per request using a standard benchmark of a 12-Core Unix distribution machine with 32GB of RAM memory.

The Challenge

In order to determine the most accurate CPM rank of a CPA (Cost Per Action) ad, for a given impression key, the system has to ‘look back’ at the statistics and calculate the total revenue divided by the total impressions and multiplied by 1,000 (to normalize to a per-milli value).

formula 1

The most complex formula in this paper

The following considerations should also take place:

  1. How far in the past should we look into?
  2. What is the smallest amount of data considered ‘enough to trust’ and how to determine the rank ‘strength’ which measures to its certainty?
  3. What should we do if there is ‘not enough’ data (yet?), look at the average performance at country/device level? What weight/strength should we assign to this ad?
  4. What do we do if there is an essential non-neglectable variance between the calculated rank of the last day/hour vs. a wider period of time?
  5. How will we measure the quality of our prediction?

This post will not cover and will not answer all of the above questions, but will present a software architecture approach taken to satisfy the following challenge:

Dilemma.jpg“How can an hourly changing performance value, based on a subset of terabytes of statistical data, be calculated and shared by a distributed, large scale of 24×7 ad serving cluster of servers?”

big_data_paradigm.jpg The big-data paradigm

The first step in meeting the challenge, is, as mentioned above, to obtain a stable and robust Big-Data infrastructure.

As a month worth of data, containing impressions and sales, is in the dozens of Terabytes, keeping several month and growing requires an elastic solution – a solution that is not bounded to a certain number of physical machines or storage size.

First_Layer.jpgFirst-Layer:

We have selected HDFS: Hadoop DFS (Distributed FIle System) coupled with Hive which gives it structural capabilities and interfaces.

Our live servers write data sequentially to files, that are, in turn, pushed into Hadoop, based on a Hive layer which gives them some structural sense and enables our next layer to perform relatively efficient queries.

Key points in implementing Hadoop and Hive and scaling up:

  1. Know your data – partition smartly:
    As we know our queries can be date driven to the day, we chose to partition most of our data by the date. We push the data to the partition ‘tipping’ the system to use the date value as the partition (in Hadoop, this translates into an actual file system folder)
  2. Compress raw data:
    When large data sets are returned from a query – the immediate performance bottleneck becomes the size, hence: I/O. Hadoop allows us to define zipped partitioned tables which dramatically improves performance.
  3. Conduct aggregations within hive ‘inside the box’:
    For predefined know key based aggregations, it’s best to define periodical scripts that ‘insert from select’ inside the big-data infrastructure.
  4. Keep your Hive schema information in an adjacent RDBMS (such as mySQL) – It’s faster!

Second_Layer.jpgSecond-Layer:

In order for us to perform queries, we started off by writing custom MR (Mapreduce) procedures. That held for a while, but held huge limitations as we grew: none of the ad-hoc data mining could be accomplished without minutes to hours of idle employee time, batch processes dragged …

Presto: an engineer from Facebook recognized the same problem, and within days, developed a genius solution called presto (https://prestodb.io/) later to be released under Apache License V2 to the open source community, adopted by AirBNB… and by us !

Presto provides great tooling for accessing HDFS using SQL full compliant queries.

Its performance is of no-compare! It has frequent new releases with abilities (such as insert) and completely turned around our infrastructure and boosted our performance.

You can think of presto as our ‘ETL tooling’, there of course alternatives such as PIG and others.

HBase

(https://hbase.apache.org/)

In a nutshell: “Apache HBase provides Bigtable-like capabilities on top of Hadoop and HDFS”

We looked for a way to daily and hourly add key-value entries to a global, elastic storage that will be available for direct, random access.

We could have opted to many known solutions such as Redis, Cassandra or other nosql scalable big-data solution out there.

However, as we decided to follow 3 strict guidelines:

  1. Not to require learning yet another infrastructure and maintain it (thus: support Hadoop HDFS).
  2. Have proven support for java and native java capabilities such as JMX
  3. Strictly consistent reads and writes.

HBase was a perfect fit for us.

Our Java processes can now query Hadoop fast using presto and insert averages and ranks per key per date in a very efficient and safe way using the same architecture into HBase. And our system engineers and devops need-not to learn to much more.

Happy we!

Hazelcast

Nothing and No-one describes hazelcast better than Hazelcast’s sole inventor: Mr. Talip Ozturk.

https://www.youtube.com/watch?v=t-A_N5bK0Lg

So, we had HBase, what were we missing?

Hbase is trustworthy, it will persist on HDFS, it will add key-value pairs (actually more than that using column families etc. not covered here) – but all this has a price: It will not be as fast as a distributed cache or shared memory.

While the Rank calculation process needs a lot of mid-way full and partial key calculation values, and will re-visit those on every increment (hourly and daily) – the live runtime servers do not really care, they just need the final answer, a very precise, up-to-date value to use for the ad against the impression, right?

Now, as our system will also require other ‘in-shared-memory’ values, such as balances, budgets, capping limits and other global values an ad-network and RTB exchange must maintain, the need for a distributed cache is clear. And it had better have many features and support Java objects as well.

This is where Hazelcast comes to play: we copy the ranks and averages into Hazelcast, versioned, so that our servers, all at once, will be aware of the most fresh results of our algorithm.

By the way, Hazelcast provides collections, types and classes compatible with what Java gives us (we use Java 8).

> 100,000 RPS (Requests Per Second)

An interesting decision point, is whether to hold the Hazelcast nodes on the same servers as their ‘clients’ (in our case: ad-servers web cluster) or on a dedicated separate cluster of hardware and JVM (Java Virtual Machines).

On one hand:

We initially believed that holding Hazelcast nodes on the same servers will reduce I/O dramatically and thereby improve the total throughput and efficiency of our front-end.

On the other hand:

We encountered several challenges:

  1. The memory consumption (and limits…) of our front-end node reached new tops.
  2. We had to improve our DevOps version upload procedures, as having a distributed-cache node down impacts the replication balancing and caused rebalancing ‘expensive’ operations later.

Eventually, it turned out that the I/O impact was not as big as expected, and thereby the best solution (for us) was to separate the clusters.

The Architecture

https://lh3.googleusercontent.com/B0xGKuzoQu2g0Cq4bC3y3Hbvp7ZAzvlB6doX2iqdX8ykkrdUhvuOWrtKlq-8i9V883db96RuiVAv7JqVYw1aOrXbIxJcntvA7hyHZU8sXUu145c2pmYyD7JhyDewXngGCFzOZSIS

High Level System Flow

The Combined Solution

Loging.jpgLogging

First we use simple sequential log files written to disk, to log all our runtime front-end events. (In our case, Log4J works perfectly!)

I will address this step as “Step A”.

So, each of our ad-servers writes to its own hard drive log files continuously.

(It is important to understand log rotating, and shutdown/reload flows avoid losing data).

We ended up using tab as our logs delimiter, we found this best for us and our data.
Shoving.jpgShoving

Step B: A well designed monitored set of shell scripts takes care of pushing the log files periodically (once an hour in our case) into our HDF via the hive layer.

It is based on the Cloudera™ distribution of HDFS, which already includes the required hive interface to use.

Good_to_know.jpg We use gzip first, as good practice, to compress the files before relocating/shoving them in. it reduces the I/O dramatically as well as saves us storage place on the HADOOP cluster computers.

At this step already, we ‘tip’ HADOOP to partition the files the way we want it to.

In our case, as our usage of data is mostly daily, and sometimes hourly, this will be our main key for partitioning. Any implementation should analyse and learn the frequent and massive data retrieval cases and arrange the data accordingly.

hive.jpghive -e “LOAD DATA LOCAL INPATH ‘$SRC_FILE’ INTO TABLE $HIVE_TABLE PARTITION(dt=’$PARTITION_DATE’)”

Example shell script snippet for pushing a source log file into hadoop using hive with partitioning.

Of Course, one may choose not to develop their own scripts architecture and zoo-keeping tools and techniques, there are many frameworks, tools and libraries available for devops tasks like these. Not covered here.

Another useful tip, is to convert the raw data into binary data.

As we are dealing eventually with columns, and each column has its own type, a smart infrastructure should know what is the optimal binary storage and access format for each typed data. Hadoop’s hive supports this for us (compressed).

So an operation of INSERT FROM SELECT will do the job.

Aggregating.jpgAggregating

Step C – As mentioned above, knowing your data and your data usage, is key to any architectural design, and specifically, or even – more dramatically, when dealing with big data.

So, if your users or processes has some keys they will usually ‘Group By’ their queries on, we can prepare that ahead for them, by defining more tables that are aggregated by those keys periodically.

In our prediction algorithm, we would like to aggregate ahead data per our impression-key and traffic-source. So we do that.

Again we use Hive’s INSERT FROM SELECT and add an additional GROUP BY to our statement.

(Make sure to continue using compressed binary formats for the aggregated tables).

As this stage, one must pay attention to

table naming conventions.

Examples:

t_raw_* for raw data tables

t_bin_* for the corresponding binary tables

t_agg_{key}_* for aggregated tables.

Hourly Increments Calculation Batches

Now, that our data is aligned in place, compressed, structured and aggregated when required, we are ready to processes it and arrange the values for our runtime servers to easily access it in an order of 1 .

Hourly.jpgStage D1/D1` – Hourly

Every hour + 10 minutes, a cron (scheduled) task is invoked, ensuring the data for the previous hour is all fresh and ready.

It then queries the data using presto SQL and for each impression key it does the following pseudo-flow:

  1. Read today’s columns from HBASE (Using HBASE Get methods) – For each of our measures, impressions, clicks, etc, there is a column for the current day data under each key.
  2. Add the previous data to for the given key
  3. ‘Put’ back into HBASE.
Good_to_know.jpgGood to know: Although being a key-value store, HBASE has great tooling also for iterating sequentially over keys, using Cursors. This feature will work if you make sure to have your key prefix aligned with the sequence you plan to iterate through (such as date, time).

HBASE Entry:

Before going on, here is an illustration (almost true) of what is kept in HBASE for each key entry:

HBASE_entry.jpg

Today:

A set of columns holding the data accumulated today , fields are incremented on the hour

Tota to Limit:

The same set, but over multiple days where this key had data (might skip days !), the limit is a set of rules such as : up to 200 days, up to x,000,000 impressions etc.

By Day:  

4 columns,

The first holds a delimited (space) list of individual days where data was accumulated.

The Second holds the corresponding list of delimited impressions values.

And so forth.

At this stage averages are also updated (not detailed here).

Daily.jpgStage D2/D2` – Daily

Once a day, the processes will take the daily aggregated data columns and add it to the Total columns and to the by-day columns. It will then reset (make values zero) the today’s columns.

At this stage averages are also updated (not detailed here).

Stage_E.jpgStage E – Push hourly data to Hazelcast

As our final goal is to have the statistically crunched data available to our runtime front-end ad-servers stack in direct access, the next step is to make it available for them in the best form and technology.

True, you are not mistaking, HBASE is a great candidate for such a task.

However, we are purpose driven, the datas available in HBASE is more than what our servers need for the real time algorithm and service.

Moreover, Hazelcast provides a better and more native solution for our Java 8 based stack.

Our algorithm just need the averages, totals and the today values.

So, the process takes the selected data, as calculated, and executes tasks that inserts the data into our Hazelcast cluster hash.

Good_to_know.jpgTip:

In some cases, It is better to use Hazelcast (HC) Executor Tasks instead of individual Put requests.

executorService.submit

Dilemma.jpgDilemma:

We can use one of two strategies to upload into the distributed cache new data for statistics per key and averages:

  1. Create a second map in HC and notify the servers somehow to switch
  • All data for all traffic is switched at once and no mix in values
  • Rollback is possible easily in cases it will be required
  • Takes up more memory on servers
  1. Update the same cache map
  • Does not take extra memory on client node machines
  • Simpler to manage and less code …
  • In the time of the updates (hourly) – one key impression might get a newly calculated value while the other will get an older one calculated on the previous hour.

Our approach:

As this data is ‘read only’ data for our servers, we also cache it locally with 1 hour expiration. So for the statistic data we chose option (2).

For the averages on subsets of keys, we chose option (1) that switches instantly after loading a second copy with the new data.

A:B_testing.jpg A/B testing the solution

Dealing with predictive algorithms is risky. A wrong weather prediction might send you with an umbrella on a sunny day, but a wrong income prediction might cause you to lose money…, lots of money.

A/B testing is a good approach

Using the described architecture, we can do that by holding in HBASE and Hazelcast multiple schemas and teaching the components to switch between them.

SO, If we decide to split, for example, and test on 50% of our traffic in Mexico .

The way we implemented our code, allows us to plug A/B test functionality as separate ‘functor’ classes to be used. So the function that gets the data from the cache is A/B tested against the second copy of the data.

We also have throttles on the amount of traffic subject to a given A/B test – But this is probably a subject for a separate blog post!

So, this way, we can measure within one hour, on a very controlled set of impressions and compare CPM results of a new change to the algo or data-crunching system.

Later to be vastly, system-wide deployed.

Conclusion

  1. Know your data
  2. Know your usage patterns
  3. Choose technology to reuse underlying infrastructure and minimize amount of different IT solutions to be master
  4. Enjoy and read open-source projects, the can be amazing!
  5. Think outside the box. For example, a column can represent one field but can also hold multiple values.
  6. A/B test changes in prediction algorithms before you fully release them.
  7. Sometimes, the simplest method, of writing to sequential log files, can be the fastest, most flexible and practical way to collect statistics and data.
  8. Hadoop, HBASE, Hive, Presto are 100% reliable for production.

Not covered here

There is a lot of ground not covered in this article.

Maybe it will be my motivation for follow-ups…

Some notes I took for points we can elaborate on next:

  • Building a QA and Dev up-to-date partial snapshot of your Big-Data
  • A/B testing methodologies
  • How to monitor the data collection and aggregation flows
  • Replication and Partitioning strategies with Big-Data solutions
  • RTB (Real-Time-Bidding) and Big-Data
  • JavaScript solution (NodeJS), BI and Big-Data
  • New vs. Old Ads strategies
  • RTB management and the priority queues challenge

Want to learn more about intango? Get in touch today.

Make Sure You Stay in Sync!

Get all the latest tips, tricks and free step-by-step guides Straight to your inbox and keep sharpening your online marketing skills

Subscribe Now & Gain Expertise