Docs: Computation Layer
The Computation Layer performs some of the functions that the Serving Layer can perform when run it its stand-alone mode:
- Storing received input data from the Serving Layer
- Computing and recomputing the underlying recommender model
- Publishing the latest model for use by the Serving Layer
and some new functions:
- Discovering clusters of users and items
However, in the Computation Layer, the storage and computation are distributed, using computers to process much more data, faster. It is implemented on top a distributed storage system, and uses Apache Hadoop, an implementation of the MapReduce distributed computing paradigm. This document covers the Computation Layer itself, and how it may be run independently of the Serving Layer if desired.
Cluster Requirements
The Computation Layer requires a Hadoop cluster and a distributed file system. Two popular deployments of this infrastructure are supported. First is an existing Hadoop cluster, with its associated HDFS file system, which may already be available at your organization. Second is Amazon's Elastic MapReduce (EMR) Hadoop service and Simple Storage Service (S3) distributed storage system. In this model, Amazon creates a Hadoop cluster on demand, which is billed by the hour.
Supported Hadoop cluster versions
| Version | Notes |
|---|---|
| Apache Hadoop 0.20.20x / 1.x |
|
| Apache Hadoop 0.23.x / 2.x |
Supported only when running an "MRv1" cluster, not a YARN / "MRv2" cluster. Requires the binary compiled for CDH4. |
| MapR M3 / M5 | |
| HortonWorks 1.x | |
| Cloudera CDH3 | |
| Cloudera CDH4 | Supported only when running an "MRv1" cluster, not a YARN / "MRv2" cluster. Requires the binary compiled for CDH4. |
Supported Amazon AWS versions
| Version | Notes |
|---|---|
| Amazon EMR | AMI version 2.2.1 and later |
| Amazon EMR with MapR | AMI version 2.2.1 and later |
Cluster Configuration
See the Tuning Guide for suggested cluster configuration settings. This does not apply to Amazon EMR, where Myrrix can configure the cluster for you directly.
Storage Layout

The Computation Layer reads input from the distributed storage system, uses it for intermediate results, and outputs the result of each run to the same distributed storage system. This section describes how the files and data are laid out on the storage system.
Instances
All files are under one root directory, called the bucket, which should be unique to the Myrrix user calling the Computation Layer. For example, Acme Corporation might use "acme". This corresponds to a root-level directory in HDFS (e.g. /acme), or a storage bucket in Amazon S3.
Each bucket is further divided to accommodate independent instances of the Computation Layer. For example, maybe Acme is running three recommenders using Myrrix, for different departments. These different instances are identified by a unique ID. A separate directory under the root directory is created and used for each instance. For example, Acme's instance ads uses files only under /acme/ads. In Amazon's S3, this would correspond to key "ads" under bucket "acme". For simplicity, going forward, this document will refer to HDFS-style file-system-like paths instead of using S3's bucket and key naming.
Generations

Computation proceeds in generations. Input is collected from Serving Layers, and periodically, a new computation generation is run using all input received to that point. Each generation uses a new sequential sub-directory of the instance directory. For example, the first time Acme instance ads runs, it will create generation 0 in /acme/ads/0000000000. Subsequent generations 1, 2 and so on will follow in sequence, in .../0000000001 and so on.
Under a generation's directory, the Serving Layers (or any other process) write input into files in an inbound subdirectory (example: /acme/ads/0000000001/inbound). All files in this directory should be CSV-format text files, optionally compressed. Processes may copy files into this directory. Files should be closed and not be modified when a generation's computation starts.
The Computation Layer runs a generation. It is responsible for creating new future generations, which other processes should detect and start writing into, so that the Computation Layer may proceed on the current generation. When it finishes, the Computation Layer publishes its output -- the factored X and Y matrices, in X/ and Y/ subdirectories (example: /acme/ads/0000000001/X). These are (compressed) text files. Each line of each file contains a row or column index (that is, a user ID for X, or item ID for Y), followed by a tab, then a series of comma-separated floating-point values, which are the feature values for that row/column in X/Y. Example:
123 0.352008,-0.11290,0.982342,...
Optionally, the Computation Layer can also create recommendations for all users. This is normally produced by the Serving Layer in real-time, but can also be computed by the Computation Layer. This may be useful when only using the Computation Layer, by itself. The --recommend flag will cause it to output recommendations in a recommend subdirectory (e.g. /acme/ads/0000000001/recommend). This also contained compressed text files. Each line has a user ID, followed by tab, the comma-separated itemID:score pairs:
456 5802:0.9855,2059:0.97001,...
The Computation layer can also compute the most similar items for all items. This is controlled by the --itemSimilarity flag, and produces output in similarItems/. The format is identical to the recommendations above.
Finally, the Computation Layer can compute user and item clusters using the k-means++ algorithm when --cluster is set. These are written as compressed text files to userClusters and itemClusters subdirectories. The format is the user or item ID, followed by tab, followed by a cluster number.
Adding input
Input is normally added through a Serving Layer instance that is connected to the Computation Layer. Data that arrives to the pref or ingest method is copied into the latest generation's inbound/ directory. It is also possible to copy CSV input directly into the same location on HDFS, into the most recent generation's inbound/ directory. The format is same as is used in Apache Mahout and the same that is used with the ingest method.
Running the Computation Layer

The Computation Layer is run as a Java program. It is simply a client that invokes the computations on a Hadoop cluster (or Amazon EMR), monitors them, and performs other operations like cleanup. It exposes a web-based UI which gives status and information on its progress. It runs continuously and periodically creates and runs new computation generations. It can be configured to run new generations after an amount of time has passed, or after an amount of data has been written, or both.
A simple invocation looks like:
java -jar myrrix-computation-x.y.jar --bucket acme --instanceID ads --licenseFile ...
This runs a Computation Layer that manages bucket "acme", instance "ads" (that is, data in /acme/ads). A web-based UI is run by default on port 8080, and can be accessed on the local machine at http://localhost:8080.
The following command line arguments are available:
| Flag | Description | Default |
|---|---|---|
--bucket |
Bucket containing data to access for computation. May be any string. | |
--instanceID |
Instance ID to operate on within the bucket. May be any string, but in version 0.7 and before, must be numeric. | |
--time |
Time in minutes that will trigger a generation. This may alternatively be specified with a suffix indicating a different time unit. "5m" means 5 minutes; "2h" means 2 hours; "1d" means 1 day. Also, two values may be specified, separated by a comma; the first will be interpreted as the delay before the first scheduled run, and the other will be used as the delay between subsequent runs. Example: --time 1h,6h will schedule a run in 1 hour, and then every 6 hours after. |
1440 (1 day) |
--data |
New data in MB that will trigger a generation. This may alternatively be specified with a suffix indicating a different unit. "200m" means 200 megabytes; "10g" means 10 gigabytes; "1t" means 1 terabyte. | 1000 (1GB) |
--recommend |
If set, also computes recommendations for all users and outputs under recommend/ |
Not set / false |
--itemSimilarity |
If set, also computes top most similar items for all items and outputs under similarItems/ |
Not set / false |
--cluster |
If set, also computes clusters of users and items, in userClusters/ and itemClusters, respectively. |
Not set / false |
--port |
Port on which to listen for HTTP requests. Note that the server must be run as the root user to access port 80. |
8080 |
--securePort |
Port on which to listen for HTTPS requests. Likewise note that using port 443 requires running as root. |
8443 |
--keystoreFile |
File containing the SSL key to use for HTTPS. Note that setting this flag enables HTTPS connections, and so requires that options keystorePassword be set. If not set, will attempt to load a keystore file from the distributed file system, at sys/keystore.ks |
|
--keystorePassword |
Password for keystoreFile. Setting this flag enables HTTPS connections. |
|
--userName |
If specified, the user name required to authenticate to the server using HTTP DIGEST authentication. Requires password to be set. |
|
--password |
Password for HTTP DIGEST authentication. Requires userName to be set. |
|
--consoleOnlyPassword |
Only apply username and password to admin / console pages. | Not set / false |
--licenseFile
| Location of a license file named [subject].lic, where [subject] is the subject name authorized in the license. The license file should be valid at the time the app is run, and contain authorization to use the amount of parallelism (max simultaneous Hadoop workers) requested. |
The following system properties may also be set on the command-line using Java's -Dkey=value syntax. These must appear before -jar. These are advanced settings that are not necessary to change.
| Key | Description | Default |
|---|---|---|
model.als.iterations. |
Estimated strength values in the original matrix change a little after each iteration, and less over time. If average absolute change in estimates is below this threshold, iteration will stop. | 0.001 |
model.iterations.max |
A hard limit of the number of iterations that will run in one build. | 30 |
model.features |
Number of features to use when creating the matrix factorization | 50 |
model.als.lambda |
Controls the lambda overfitting parameter in the ALS algorithm. This should not normally be changed. | 0.01 |
model.als.alpha |
Controls the alpha parameter in the ALS algorithm. This should not normally be changed. | 40 |
model.decay.factor |
Decay factor per generation for old data. For example, 0.95 means existing data's values are reduced by 5% each generation. | 1.0 (no decay) |
model.decay.zeroThreshold |
Below this value, data is considered to have decayed to zero and is removed. | 0.0 |
model.cluster.user.k |
Controls how many user clusters are computed with --cluster. Prior to version 0.11, this was set with model.cluster.k |
100 |
model.cluster.item.k |
Controls how many item clusters are computed with --cluster. Prior to version 0.11, this was set with model.cluster.k |
100 |
model.generations.maxToKeep |
How many generations to keep on HDFS/S3. Must be at least 2. When this is exceeded, older generation will be deleted. | 10 |
output.recs.count |
How many recommendations to output per user. | 10 |
output.similar.count |
How many similarities to output per item. | 10 |
Example:
java -Doutput.recs.count=100 -jar myrrix-computation-x.y.jar --bucket acme --instanceID ads --time 12h --recommend --port 80 --licenseFile ...
This runs a Computation Layer for bucket "acme" and instance ID ads, as before. It computes 100 recommendations for all users with each generation. Every 12 hours it will attempt to run a new generation. The web based UI runs on port 80 (note that it must run as root if using port 80).
Platform-specific Notes
Notes on Apache Hadoop
The following system properties may be set to affect how the Computation Layer runs:
| Key | Description | Default |
|---|---|---|
mapred.reduce.tasks |
The base number of reducers to use in each MapReduce. Each MapReduce will actually use a multiple of this value that tunes the value suitably for its output. Increase for large input if reduce tasks are taking a long time (e.g. more than 10 minutes) to complete. This should be at least as large as the number of available workers in the cluster times number of reduce slots per workers. Note that this value is automatically configured when running on Amazon EMR, but may still be overridden to a higher value. | 4 |
Notes on Amazon EMR
When using Amazon EMR, use the JAR file built to work with Amazon's Web Services APIs, myrrix-computation-aws-x.y.jar. Note that the client will upload the JAR file itself to S3 at sys/myrrix.jar since it must be available remotely.
It is necessary to supply an AWS access key and secret key that is authorized to use the bucket, and authorized to run jobs on AWS. These are specified as system properties. For example:
java -Dstore.aws.accessKey=[access key] -Dstore.aws.secretKey=[secret key] -jar myrrix-computation-aws-x.y.jar --bucket acme --instanceID ads
They may also be specified as the standard AWS environment variables, AWS_ACCESS_KEY and AWS_SECRET_KEY.
The complete set of system properties available that control the behavior of the Computation Layer on Amazon S3 and EMR are:
| Key | Description | Default |
|---|---|---|
store.aws.accessKey |
Amazon AWS access key. Required if env variable AWS_ACCESS_KEY is not set. |
|
store.aws.secretKey |
Amazon AWS secret key. Required if env variable AWS_SECRET_KEY is not set. |
|
batch.emr.zone |
Name of Amazon EMR availability zone to run in. It is strongly preferable to choose the same zone that the S3 bucket is hosted in. While it's possible to run in a different zone, it will incur additional charges from Amazon for the cross-zone S3 access. | us-east-1 |
batch.emr.workers.count |
How many instances to run (role CORE), as Hadoop MapReduce and HDFS workers in EMR |
1 |
batch.emr.workers.tasks.count |
Of the number of total workers (above), how many to instead run as MapReduce-only workers (role TASK). These will be requests from the EC2 spot market and may or may not run, but will cost less per hour to run. |
0 |
batch.emr.workers.type |
Amazon EC2 instance type to run as each worker. | m1.xlarge |
batch.emr.workers.bid |
Price, in dollars per hour, to bid for time on the Amazon EC2 spot market. If set, CORE workers may be interrupted, as they run as market type SPOT rather than ON_DEMAND instances. May be set to "auto" to bid the current lowest market spot price. |
|
batch.emr.debug |
If true, Amazon EMR logging, and debugging, are enabled. Logs are available under the generation directory in logs, and the AWS console may be used to investigate the output of each worker. |
false |
batch.emr.supportedProduct |
Sets EMR's "supported product" field, which can be used to select MapR's distribution of Hadoop by setting this to "mapr-m3" or "mapr-m5" |
