Batch predictions

The batch prediction API provides a way to score large datasets using flexible options for intake and output on the Prediction Servers you have already deployed.

The main features are:

  • Flexible options for intake and output.

  • Stream local files and start scoring while still uploading and simultaneously downloading the results.

  • Score large datasets from and to S3.

  • Connect to your database using JDBC with bidirectional streaming of scoring data and results.

  • Intake and output options can be mixed and do not need to match. So scoring from a JDBC source to an S3 target is also an option.

  • Protection against overloading your prediction servers with the option to control the concurrency level for scoring.

  • Prediction explanations can be included (with the option to add thresholds).

  • Passthrough columns are supported to correlate scored data with source data.

  • You can include prediction warnings in the output.

To interact with batch predictions, see the BatchPredictionJob class.

Make batch predictions with a deployment

DataRobot provides a utility function to make batch predictions using a deployment: Deployment.predict_batch.

import datarobot as dr

deployment = dr.Deployment.get(deployment_id='5c939e08962d741e34f609f0')
# To note: `source` can be a file path, a file or a pandas DataFrame
prediction_results_as_dataframe = deployment.predict_batch(
    source="./my_local_file.csv",
)

Scoring local CSV files

DataRobot provides a utility function for scoring to and from local CSV files: BatchPredictionJob.score_to_file. The first parameter can be either:

  • A path to a CSV dataset

  • A file-like object

  • A Pandas DataFrame

For larger datasets, you should avoid using a DataFrame, as it loads the entire dataset into memory. The other options do not.

import datarobot as dr

deployment_id = '5dc5b1015e6e762a6241f9aa'

dr.BatchPredictionJob.score_to_file(
    deployment_id,
    './data_to_predict.csv',
    './predicted.csv',
)

The input file is streamed to DataRobot’s API and scoring starts immediately. As soon as results start coming in, they start to be downloaded. The entire call is blocked until the file has been scored.

Scoring from and to S3

DataRobot provides a small utility function for scoring to and from CSV files hosted on S3: BatchPredictionJob.score_s3. This requires that the intake and output buckets share the same credentials (see Credentials and Credential.create_s3) or that their access policy is set to public:

Note that the S3 output functionality has a limit of 100 GB.

import datarobot as dr

deployment_id = '5dc5b1015e6e762a6241f9aa'

cred = dr.Credential.get('5a8ac9ab07a57a0001be501f')

job = dr.BatchPredictionJob.score_s3(
    deployment=deployment_id,
    source_url='s3://mybucket/data_to_predict.csv',
    destination_url='s3://mybucket/predicted.csv',
    credential=cred,
)

Scoring from and to Azure Cloud Storage

DataRobot provides the same support for Azure through the utility function BatchPredictionJob.score_azure. This requires that you add an Azure connection string to the DataRobot credentials store. (see Credentials and Credential.create_azure)

import datarobot as dr

deployment_id = '5dc5b1015e6e762a6241f9aa'

cred = dr.Credential.get('5a8ac9ab07a57a0001be501f')

job = dr.BatchPredictionJob.score_azure(
    deployment=deployment_id,
    source_url='https://mybucket.blob.core.windows.net/bucket/data_to_predict.csv',
    destination_url='https://mybucket.blob.core.windows.net/results/predicted.csv',
    credential=cred,
)

Scoring from and to Google Cloud Platform

DataRobot provides the same support for GCP through the utility function BatchPredictionJob.score_gcp. It requires you to add a GCP connection string to the DataRobot credentials store. (See Credentials and Credential.create_gcp.)

import datarobot as dr

deployment_id = '5dc5b1015e6e762a6241f9aa'

cred = dr.Credential.get('5a8ac9ab07a57a0001be501f')

job = dr.BatchPredictionJob.score_gcp(
    deployment=deployment_id,
    source_url='gs:/bucket/data_to_predict.csv',
    destination_url='gs://results/predicted.csv',
    credential=cred,
)

Manually configure a batch prediction job

If you can’t use any of the utilities above, you are also free to manually configure your job. This requires configuring an intake and output option. Credentials may be created with Credentials API.

import datarobot as dr

deployment_id = '5dc5b1015e6e762a6241f9aa'

dr.BatchPredictionJob.score(
    deployment_id,
    intake_settings={
        'type': 's3',
        'url': 's3://public-bucket/data_to_predict.csv',
        'credential_id': '5a8ac9ab07a57a0001be501f',
    },
    output_settings={
        'type': 'localFile',
        'path': './predicted.csv',
    },
)

Supported intake types

The following sections outline the supported intake types and describe their configuration parameters:

Local file intake

Local file intake requires you to pass either a path to a CSV dataset, a file-like object, or a Pandas DataFrame as the file parameter:

intake_settings={
    'type': 'localFile',
    'file': './data_to_predict.csv',
}

S3 CSV intake

S3 CSV intake requires you to pass an S3 URL to the CSV file to be scored in the url parameter:

intake_settings={
    'type': 's3',
    'url': 's3://public-bucket/data_to_predict.csv',
}

If the bucket is not publicly accessible, you can supply AWS credentials using the following parameters:

  • aws_access_key_id

  • aws_secret_access_key

  • aws_session_token

Save it to the Credential API:

import datarobot as dr

# get to make sure it exists
credential_id = '5a8ac9ab07a57a0001be501f'
cred = dr.Credential.get(credential_id)

intake_settings={
    'type': 's3',
    'url': 's3://private-bucket/data_to_predict.csv',
    'credential_id': cred.credential_id,
}

JDBC intake

JDBC intake requires you to create a DataStore and Credential for your database:

# get to make sure it exists
datastore_id = '5a8ac9ab07a57a0001be5010'
data_store = dr.DataStore.get(datastore_id)

credential_id = '5a8ac9ab07a57a0001be501f'
cred = dr.Credential.get(credential_id)

intake_settings = {
    'type': 'jdbc',
    'table': 'table_name',
    'schema': 'public', # optional, if supported by database
    'catalog': 'master', # optional, if supported by database
    'data_store_id': data_store.id,
    'credential_id': cred.credential_id,
}

BigQuery intake

BigQuery intake requires you to create a GCS Credential for your database:

# get to make sure it exists
credential_id = '5a8ac9ab07a57a0001be501f'
cred = dr.Credential.get(credential_id)

intake_settings = {
    'type': 'bigquery',
    'dataset': 'dataset_name',
    'table': 'table_or_view_name',
    'bucket': 'bucket_in_gcs',
    'credential_id': cred.credential_id,
}

AI Catalog intake

AI Catalog intake requires you to create a Dataset and identify the dataset_id to use as an input.

# get to make sure it exists
dataset_id = '5a8ac9ab07a57a0001be501f'
dataset = dr.Dataset.get(dataset_id)

intake_settings={
    'type': 'dataset',
    'dataset': dataset
}

Or, if you want a version_id other than the latest, supply your own.

# get to make sure it exists
dataset_id = '5a8ac9ab07a57a0001be501f'
dataset = dr.Dataset.get(dataset_id)

intake_settings={
    'type': 'dataset',
    'dataset': dataset,
    'dataset_version_id': 'another_version_id'
}

Datasphere intake

Datasphere intake requires you to create a DataStore and Credential for your database:

# get to make sure it exists
datastore_id = '5a8ac9ab07a57a0001be5011'
data_store = dr.DataStore.get(datastore_id)

credential_id = '5a8ac9ab07a57a0001be501f'
cred = dr.Credential.get(credential_id)

intake_settings = {
    'type': 'datasphere',
    'table': 'table_name',
    'schema': 'DATASPHERE_SPACE_NAME',
    'data_store_id': data_store.id,
    'credential_id': cred.credential_id,
}

Supported output types

The sections below outline the supported output types and descriptions of their configuration parameters.

Local file output

For local file output, you have two options.

  1. You can either pass a path parameter and have the client block and download the scored data concurrently. This is the fastest way to get predictions as it will upload, score, and download concurrently:

output_settings={
    'type': 'localFile',
    'path': './predicted.csv',
}
  1. Alternatively, leave out the parameter and subsequently call BatchPredictionJob.download. The BatchPredictionJob.score call will then return as soon as the upload is complete.

If the job is not finished scoring, the call to BatchPredictionJob.download will start streaming the data that has been scored so far and block until more data is available.

You can poll for job completion using BatchPredictionJob.get_status or use BatchPredictionJob.wait_for_completion to wait.

import datarobot as dr

deployment_id = '5dc5b1015e6e762a6241f9aa'

job = dr.BatchPredictionJob.score(
    deployment_id,
    intake_settings={
        'type': 'localFile',
        'file': './data_to_predict.csv',
    },
    output_settings={
        'type': 'localFile',
    },
)

job.wait_for_completion()

with open('./predicted.csv', 'wb') as f:
    job.download(f)

S3 CSV output

S3 CSV output requires you to pass an S3 URL to the CSV file where the scored data should be saved in the url parameter:

output_settings={
    'type': 's3',
    'url': 's3://public-bucket/predicted.csv',
}

Most likely, the bucket is not publicly accessible for writes, but you can supply AWS credentials using these parameters:

  • aws_access_key_id

  • aws_secret_access_key

  • aws_session_token

Save it to the Credential API. Here is an example:

# get to make sure it exists
credential_id = '5a8ac9ab07a57a0001be501f'
cred = dr.Credential.get(credential_id)

output_settings={
    'type': 's3',
    'url': 's3://private-bucket/predicted.csv',
    'credential_id': cred.credential_id,
}

JDBC output

Just as for the input, JDBC output requires you to create a DataStore and Credential for your database, but for output_settings you also need to specify statement_type, which should be one of datarobot.enums.AVAILABLE_STATEMENT_TYPES:

# get to make sure it exists
datastore_id = '5a8ac9ab07a57a0001be5010'
data_store = dr.DataStore.get(datastore_id)

credential_id = '5a8ac9ab07a57a0001be501f'
cred = dr.Credential.get(credential_id)

output_settings = {
    'type': 'jdbc',
    'table': 'table_name',
    'schema': 'public', # optional, if supported by database
    'catalog': 'master', # optional, if supported by database
    'statement_type': 'insert',
    'data_store_id': data_store.id,
    'credential_id': cred.credential_id,
}

BigQuery output

Just as for the input, BigQuery requires you to create a GCS Credential to access BigQuery:

# get to make sure it exists
credential_id = '5a8ac9ab07a57a0001be501f'
cred = dr.Credential.get(credential_id)

output_settings = {
    'type': 'bigquery',
    'dataset': 'dataset_name',
    'table': 'table_name',
    'bucket': 'bucket_in_gcs',
    'credential_id': cred.credential_id,
}

Datasphere output

Same as for the input, this requires you to create a DataStore and Credential for your database:

# get to make sure it exists
datastore_id = '5a8ac9ab07a57a0001be5010'
data_store = dr.DataStore.get(datastore_id)

credential_id = '5a8ac9ab07a57a0001be501f'
cred = dr.Credential.get(credential_id)

output_settings = {
    'type': 'datasphere',
    'table': 'table_name',
    'schema': 'DATASPHERE_SPACE_NAME',
    'data_store_id': data_store.id,
    'credential_id': cred.credential_id,
}

Copy a previously submitted job

To submit a job using parameters from a job that was previously submitted, use BatchPredictionJob.score_from_existing. The first parameter is the job ID of another job.

import datarobot as dr

previously_submitted_job_id = '5dc5b1015e6e762a6241f9aa'

dr.BatchPredictionJob.score_from_existing(
    previously_submitted_job_id,
)

Scoring an in-memory Pandas DataFrame

When working with DataFrames, DataRobot provides a method for scoring the data without first writing it to a CSV file and subsequently reading the data back from a CSV file: BatchPredictionJob.score_pandas <datarobot.models.BatchPredictionJob.score_pandas>.

This method also joins the computed predictions into the existing DataFrame. The first parameter is the deployment ID and the second is the DataFrame to score.

import datarobot as dr
import pandas as pd

deployment_id = '5dc5b1015e6e762a6241f9aa'

df = pd.read_csv('testdata/titanic_predict.csv')

job, df = dr.BatchPredictionJob.score_pandas(deployment_id, df)

The method returns a copy of the job status and the updated DataFrame with the predictions added. So your DataFrame will now contain the following extra columns:

  • Survived_1_PREDICTION

  • Survived_0_PREDICTION

  • Survived_PREDICTION

  • THRESHOLD

  • POSITIVE_CLASS

  • prediction_status

print(df)
     PassengerId  Pclass                                          Name  ... Survived_PREDICTION  THRESHOLD  POSITIVE_CLASS
0            892       3                              Kelly, Mr. James  ...                   0        0.5               1
1            893       3              Wilkes, Mrs. James (Ellen Needs)  ...                   1        0.5               1
2            894       2                     Myles, Mr. Thomas Francis  ...                   0        0.5               1
3            895       3                              Wirz, Mr. Albert  ...                   0        0.5               1
4            896       3  Hirvonen, Mrs. Alexander (Helga E Lindqvist)  ...                   1        0.5               1
..           ...     ...                                           ...  ...                 ...        ...             ...
413         1305       3                            Spector, Mr. Woolf  ...                   0        0.5               1
414         1306       1                  Oliva y Ocana, Dona. Fermina  ...                   0        0.5               1
415         1307       3                  Saether, Mr. Simon Sivertsen  ...                   0        0.5               1
416         1308       3                           Ware, Mr. Frederick  ...                   0        0.5               1
417         1309       3                      Peter, Master. Michael J  ...                   1        0.5               1

[418 rows x 16 columns]

If you don’t want all of them or if you’re not happy with the names of the added columns, they can be modified using column remapping:

import datarobot as dr
import pandas as pd

deployment_id = '5dc5b1015e6e762a6241f9aa'

df = pd.read_csv('testdata/titanic_predict.csv')

job, df = dr.BatchPredictionJob.score_pandas(
    deployment_id,
    df,
    column_names_remapping={
        'Survived_1_PREDICTION': None,       # discard column
        'Survived_0_PREDICTION': None,       # discard column
        'Survived_PREDICTION': 'predicted',  # rename column
        'THRESHOLD': None,                   # discard column
        'POSITIVE_CLASS': None,              # discard column
    },
)

Any column mapped to None will be discarded. Any column mapped to a string will be renamed. Any column not mentioned will be kept in the output untouched. Your DataFrame now contains the following extra columns:

  • predicted

  • prediction_status

Refer to the documentation for BatchPredictionJob.score to see the full range of available options.

Batch prediction job definitions

To submit a working Batch Prediction job, you must supply a variety of elements to the datarobot.models.BatchPredictionJob.score() request payload depending on what type of prediction is required. Additionally, you must consider the type of intake and output adapters used for a given job.

Every time a new batch prediction is created, the same amount of information must be stored somewhere outside of DataRobot and resubmitted every time.

Note

The name parameter must be unique across your organization. If you attempt to create multiple definitions with the same name, the request will fail. If you wish to free up a name, you must first datarobot.models.BatchPredictionJobDefinition.delete() the existing definition before creating this one. Alternatively, you can just datarobot.models.BatchPredictionJobDefinition.update() the existing definition with a new name.

For example, a request could look like:

import datarobot as dr

deployment_id = "5dc5b1015e6e762a6241f9aa"

job = dr.BatchPredictionJob.score(
    deployment_id,
    intake_settings={
        "type": "s3",
        "url": "s3://bucket/container/file.csv",
        "credential_id": "5dc5b1015e6e762a6241f9bb"
    },
    output_settings={
        "type": "s3",
        "url": "s3://bucket/container/output.csv",
        "credential_id": "5dc5b1015e6e762a6241f9bb"
    },
)

job.wait_for_completion()

with open("./predicted.csv", "wb") as f:
    job.download(f)

Job definitions

If your use case requires the same (or similar) type(s) of predictions to be made multiple times, you can choose to create a Job Definition of the batch prediction job and store it for future use.

The method for creating job definitions is datarobot.models.BatchPredictionJobDefinition.create(), which includes the enabled, name, and schedule parameters.

>>> import datarobot as dr
>>> job_spec = {
...    "num_concurrent": 4,
...    "deployment_id": "5dc5b1015e6e762a6241f9aa",
...    "intake_settings": {
...        "url": "s3://foobar/123",
...        "type": "s3",
...        "format": "csv",
...        "credential_id": "5dc5b1015e6e762a6241f9bb"
...    },
...    "output_settings": {
...        "url": "s3://foobar/123",
...        "type": "s3",
...        "format": "csv",
...        "credential_id": "5dc5b1015e6e762a6241f9bb"
...    },
...}
>>> definition = BatchPredictionJobDefinition.create(
...    enabled=False,
...    batch_prediction_job=job_spec,
...    name="some_definition_name",
...    schedule=None
... )
>>> definition
BatchPredictionJobDefinition(foobar)

Execute a job definition

Manual job execution

To submit a stored job definition for scoring, you can either do so on a scheduled basis, described below, or manually submit the definition ID using datarobot.models.BatchPredictionJobDefinition.run_once():

>>> import datarobot as dr
>>> definition = dr.BatchPredictionJobDefinition.get("5dc5b1015e6e762a6241f9aa")
>>> job = definition.run_once()
>>> job.wait_for_completion()

Scheduled job execution

A scheduled batch prediction job works just like a regular batch prediction job, but instead DataRobot handles the execution of the job.

In order to schedule the execution of a batch prediction job, a definition must first be created using datarobot.models.BatchPredictionJobDefinition.create(), or updated using datarobot.models.BatchPredictionJobDefinition.update(). In this case, enabled is set to True and a schedule payload is provided.

Alternatively, use a shorthand version with datarobot.models.BatchPredictionJobDefinition.run_on_schedule():

>>> import datarobot as dr
>>> schedule = {
...    "day_of_week": [
...        1
...    ],
...    "month": [
...        "*"
...    ],
...    "hour": [
...        16
...    ],
...    "minute": [
...        0
...    ],
...    "day_of_month": [
...        1
...    ]
...}
>>> definition = dr.BatchPredictionJob.get("5dc5b1015e6e762a6241f9aa")
>>> job = definition.run_on_schedule(schedule)

If the created job was not enabled previously, this method will also enable it.

The schedule payload

The schedule payload defines at what intervals the job should run, which can be combined in various ways to construct complex scheduling terms if needed. In all of the elements in the objects, you can supply either an asterisk ["*"] denoting “every” time denomination or an array of integers (e.g. [1, 2, 3]) to define a specific interval.

The schedule payload elements

Key

Possible values

Example

Description

minute

["*"] or [0 ... 59]

[15, 30, 45]

The job will run at these minute values for every hour of the day.

hour

["*"] or [0 ... 23]

[12,23]

The hour(s) of the day that the job will run.

month

["*"] or [1 ... 12]

["jan"]

Strings, either 3-letter abbreviations or the full name of the month, can be used interchangeably (e.g., “jan” or “october”).

Months that are not compatible with day_of_month are ignored, for example {"day_of_month": [31], "month":["feb"]}.

day_of_week

["*"] or [0 ... 6] where (Sunday=0)

["sun"]

The day(s) of the week that the job will run. Strings, either 3-letter abbreviations or the full name of the day, can be used interchangeably (e.g., “sunday”, “Sunday”, “sun”, or “Sun”, all map to [0]).

NOTE: This field is additive with day_of_month, meaning the job will run both on the date specified by day_of_month and the day defined in this field.

day_of_month

["*"] or [1 ... 31]

[1, 25]

The date(s) of the month that the job will run. Allowed values are either [1 ... 31] or ["*"] for all days of the month.

NOTE: This field is additive with day_of_week, meaning the job will run both on the date(s) defined in this field and the day specified by day_of_week (for example, dates 1st, 2nd, 3rd, plus every Tuesday). If day_of_month is set to ["*"] and day_of_week is defined, the scheduler will trigger on every day of the month that matches day_of_week (for example, Tuesday the 2nd, 9th, 16th, 23rd, 30th).

Invalid dates such as February 31st are ignored.

Disable a scheduled job

Job definitions are only be executed by the scheduler if enabled is set to True. If you have a job definition that was previously running as a scheduled job, but should now be stopped, simply datarobot.models.BatchPredictionJobDefinition.delete() to remove it completely, or datarobot.models.BatchPredictionJobDefinition.update() it with enabled=False if you want to keep the definition, but stop the scheduled job from executing at intervals. If a job is currently running, this will finish execution regardless.

>>> import datarobot as dr
>>> definition = dr.BatchPredictionJobDefinition.get("5dc5b1015e6e762a6241f9aa")
>>> definition.delete()