Batch Predictions¶
The Batch Prediction API provides a way to score large datasets using flexible options for intake and output on the Prediction Servers you have already deployed.
The main features are:
Flexible options for intake and output.
Stream local files and start scoring while still uploading - while simultaneously downloading the results.
Score large datasets from and to S3.
Connect to your database using JDBC with bidirectional streaming of scoring data and results.
Intake and output options can be mixed and doesn’t need to match. So scoring from a JDBC source to an S3 target is also an option.
Protection against overloading your prediction servers with the option to control the concurrency level for scoring.
Prediction Explanations can be included (with option to add thresholds).
Passthrough Columns are supported to correlate scored data with source data.
Prediction Warnings can be included in the output.
To interact with Batch Predictions, you should use the BatchPredictionJob class.
Make batch predictions with a deployment¶
DataRobot provides a utility function to make batch predictions using a deployment: Deployment.predict_batch
.
import datarobot as dr
deployment = dr.Deployment.get(deployment_id='5c939e08962d741e34f609f0')
# To note: `source` can be a file path, a file or a pandas DataFrame
prediction_results_as_dataframe = deployment.predict_batch(
source="./my_local_file.csv",
)
Scoring local CSV files¶
We provide a small utility function for scoring from/to local CSV files: BatchPredictionJob.score_to_file
.
The first parameter can be either:
Path to a CSV dataset
File-like object
Pandas DataFrame
For larger datasets, you should avoid using a DataFrame, as that will load the entire dataset into memory. The other options don’t.
import datarobot as dr
deployment_id = '5dc5b1015e6e762a6241f9aa'
dr.BatchPredictionJob.score_to_file(
deployment_id,
'./data_to_predict.csv',
'./predicted.csv',
)
The input file will be streamed to our API and scoring will start immediately. As soon as results start coming in, we will initiate the download concurrently. The entire call will block until the file has been scored.
Scoring from and to S3¶
We provide a small utility function for scoring from/to CSV files hosted on S3 BatchPredictionJob.score_s3
.
This requires that the intake and output buckets share the same credentials (see Credentials
and Credential.create_s3
) or that their access policy is set to public:
import datarobot as dr
deployment_id = '5dc5b1015e6e762a6241f9aa'
cred = dr.Credential.get('5a8ac9ab07a57a0001be501f')
job = dr.BatchPredictionJob.score_s3(
deployment=deployment_id,
source_url='s3://mybucket/data_to_predict.csv',
destination_url='s3://mybucket/predicted.csv',
credential=cred,
)
Note
The S3 output functionality has a limit of 100 GB.
Scoring from and to Azure Cloud Storage¶
Like with S3, we provide the same support for Azure through the utility function BatchPredictionJob.score_azure
.
This required that an Azure connection string has been added to the DataRobot credentials store.
(see Credentials and Credential.create_azure
)
import datarobot as dr
deployment_id = '5dc5b1015e6e762a6241f9aa'
cred = dr.Credential.get('5a8ac9ab07a57a0001be501f')
job = dr.BatchPredictionJob.score_azure(
deployment=deployment_id,
source_url='https://mybucket.blob.core.windows.net/bucket/data_to_predict.csv',
destination_url='https://mybucket.blob.core.windows.net/results/predicted.csv',
credential=cred,
)
Scoring from and to Google Cloud Platform¶
Like with Azure, we provide the same support for GCP through the utility function BatchPredictionJob.score_gcp
.
This required that an Azure connection string has been added to the DataRobot credentials store. (see Credentials and
Credential.create_gcp
)
import datarobot as dr
deployment_id = '5dc5b1015e6e762a6241f9aa'
cred = dr.Credential.get('5a8ac9ab07a57a0001be501f')
job = dr.BatchPredictionJob.score_gcp(
deployment=deployment_id,
source_url='gs:/bucket/data_to_predict.csv',
destination_url='gs://results/predicted.csv',
credential=cred,
)
Wiring a Batch Prediction Job manually¶
If you can’t use any of the utilities above, you are also free to configure your job manually. This requires configuring an intake and output option:
import datarobot as dr
deployment_id = '5dc5b1015e6e762a6241f9aa'
dr.BatchPredictionJob.score(
deployment_id,
intake_settings={
'type': 's3',
'url': 's3://public-bucket/data_to_predict.csv',
'credential_id': '5a8ac9ab07a57a0001be501f',
},
output_settings={
'type': 'localFile',
'path': './predicted.csv',
},
)
Credentials may be created with Credentials API.
Supported intake types¶
These are the supported intake types and descriptions of their configuration parameters:
Local file intake¶
This requires you to pass either a path to a CSV dataset, file-like object or a Pandas
DataFrame as the file
parameter:
intake_settings={
'type': 'localFile',
'file': './data_to_predict.csv',
}
S3 CSV intake¶
This requires you to pass an S3 URL to the CSV file your scoring in the url
parameter:
intake_settings={
'type': 's3',
'url': 's3://public-bucket/data_to_predict.csv',
}
If the bucket is not publicly accessible, you can supply AWS credentials using the three parameters:
aws_access_key_id
aws_secret_access_key
aws_session_token
And save it to the Credential API. Here is an example:
import datarobot as dr
# get to make sure it exists
credential_id = '5a8ac9ab07a57a0001be501f'
cred = dr.Credential.get(credential_id)
intake_settings={
'type': 's3',
'url': 's3://private-bucket/data_to_predict.csv',
'credential_id': cred.credential_id,
}
JDBC intake¶
This requires you to create a DataStore and Credential for your database:
# get to make sure it exists
datastore_id = '5a8ac9ab07a57a0001be5010'
data_store = dr.DataStore.get(datastore_id)
credential_id = '5a8ac9ab07a57a0001be501f'
cred = dr.Credential.get(credential_id)
intake_settings = {
'type': 'jdbc',
'table': 'table_name',
'schema': 'public', # optional, if supported by database
'catalog': 'master', # optional, if supported by database
'data_store_id': data_store.id,
'credential_id': cred.credential_id,
}
BigQuery intake¶
This requires you to create a GCS Credential for your database:
# get to make sure it exists
credential_id = '5a8ac9ab07a57a0001be501f'
cred = dr.Credential.get(credential_id)
intake_settings = {
'type': 'bigquery',
'dataset': 'dataset_name',
'table': 'table_or_view_name',
'bucket': 'bucket_in_gcs',
'credential_id': cred.credential_id,
}
AI Catalog intake¶
This requires you to create a Dataset and identify the dataset_id of that to use as input.
# get to make sure it exists
dataset_id = '5a8ac9ab07a57a0001be501f'
dataset = dr.Dataset.get(dataset_id)
intake_settings={
'type': 'dataset',
'dataset': dataset
}
Or, in case you want another version_id than the latest, supply your own.
# get to make sure it exists
dataset_id = '5a8ac9ab07a57a0001be501f'
dataset = dr.Dataset.get(dataset_id)
intake_settings={
'type': 'dataset',
'dataset': dataset,
'dataset_version_id': 'another_version_id'
}
Supported output types¶
These are the supported output types and descriptions of their configuration parameters:
Local file output¶
For local file output you have two options. You can either pass a path
parameter and
have the client block and download the scored data concurrently. This is the fastest way
to get predictions as it will upload, score and download concurrently:
output_settings={
'type': 'localFile',
'path': './predicted.csv',
}
Another option is to leave out the parameter and subsequently call BatchPredictionJob.download
at your own convenience. The BatchPredictionJob.score
call will then return as soon as the upload is complete.
If the job is not finished scoring, the call to BatchPredictionJob.download
will start
streaming the data that has been scored so far and block until more data is available.
You can poll for job completion using BatchPredictionJob.get_status
or use
BatchPredictionJob.wait_for_completion
to wait.
import datarobot as dr
deployment_id = '5dc5b1015e6e762a6241f9aa'
job = dr.BatchPredictionJob.score(
deployment_id,
intake_settings={
'type': 'localFile',
'file': './data_to_predict.csv',
},
output_settings={
'type': 'localFile',
},
)
job.wait_for_completion()
with open('./predicted.csv', 'wb') as f:
job.download(f)
S3 CSV output¶
This requires you to pass an S3 URL to the CSV file where the scored data should be saved
to in the url
parameter:
output_settings={
'type': 's3',
'url': 's3://public-bucket/predicted.csv',
}
Most likely, the bucket is not publicly accessible for writes, but you can supply AWS credentials using the three parameters:
aws_access_key_id
aws_secret_access_key
aws_session_token
And save it to the Credential API. Here is an example:
# get to make sure it exists
credential_id = '5a8ac9ab07a57a0001be501f'
cred = dr.Credential.get(credential_id)
output_settings={
'type': 's3',
'url': 's3://private-bucket/predicted.csv',
'credential_id': cred.credential_id,
}
JDBC output¶
Same as for the input, this requires you to create a DataStore and
Credential for your database, but for output_settings you also need to specify
statementType, which should be one of datarobot.enums.AVAILABLE_STATEMENT_TYPES
:
# get to make sure it exists
datastore_id = '5a8ac9ab07a57a0001be5010'
data_store = dr.DataStore.get(datastore_id)
credential_id = '5a8ac9ab07a57a0001be501f'
cred = dr.Credential.get(credential_id)
output_settings = {
'type': 'jdbc',
'table': 'table_name',
'schema': 'public', # optional, if supported by database
'catalog': 'master', # optional, if supported by database
'statementType': 'insert',
'data_store_id': data_store.id,
'credential_id': cred.credential_id,
}
BigQuery output¶
Same as for the input, this requires you to create a GCS Credential to access BigQuery:
# get to make sure it exists
credential_id = '5a8ac9ab07a57a0001be501f'
cred = dr.Credential.get(credential_id)
output_settings = {
'type': 'bigquery',
'dataset': 'dataset_name',
'table': 'table_name',
'bucket': 'bucket_in_gcs',
'credential_id': cred.credential_id,
}
Copying a previously submitted job¶
We provide a small utility function for submitting a job using parameters from a job previously submitted:
BatchPredictionJob.score_from_existing
.
The first parameter is the job id of another job.
import datarobot as dr
previously_submitted_job_id = '5dc5b1015e6e762a6241f9aa'
dr.BatchPredictionJob.score_from_existing(
previously_submitted_job_id,
)
Scoring an in-memory Pandas DataFrame¶
When working with DataFrames, we provide a method for scoring the data without first writing it to a CSV file and subsequently reading the data back from a CSV file.
This will also take care of joining the computed predictions into the existing DataFrame.
Use the method BatchPredictionJob.score_pandas
.
The first parameter is the deployment ID and then the DataFrame to score.
import datarobot as dr
import pandas as pd
deployment_id = '5dc5b1015e6e762a6241f9aa'
df = pd.read_csv('testdata/titanic_predict.csv')
job, df = dr.BatchPredictionJob.score_pandas(deployment_id, df)
The method returns a copy of the job status and the updated DataFrame with the predictions added. So your DataFrame will now contain the following extra columns:
Survived_1_PREDICTION
Survived_0_PREDICTION
Survived_PREDICTION
THRESHOLD
POSITIVE_CLASS
prediction_status
print(df)
PassengerId Pclass Name ... Survived_PREDICTION THRESHOLD POSITIVE_CLASS
0 892 3 Kelly, Mr. James ... 0 0.5 1
1 893 3 Wilkes, Mrs. James (Ellen Needs) ... 1 0.5 1
2 894 2 Myles, Mr. Thomas Francis ... 0 0.5 1
3 895 3 Wirz, Mr. Albert ... 0 0.5 1
4 896 3 Hirvonen, Mrs. Alexander (Helga E Lindqvist) ... 1 0.5 1
.. ... ... ... ... ... ... ...
413 1305 3 Spector, Mr. Woolf ... 0 0.5 1
414 1306 1 Oliva y Ocana, Dona. Fermina ... 0 0.5 1
415 1307 3 Saether, Mr. Simon Sivertsen ... 0 0.5 1
416 1308 3 Ware, Mr. Frederick ... 0 0.5 1
417 1309 3 Peter, Master. Michael J ... 1 0.5 1
[418 rows x 16 columns]
If you don’t want all of them or if you’re not happy with the names of the added columns, they can be modified using column remapping:
import datarobot as dr
import pandas as pd
deployment_id = '5dc5b1015e6e762a6241f9aa'
df = pd.read_csv('testdata/titanic_predict.csv')
job, df = dr.BatchPredictionJob.score_pandas(
deployment_id,
df,
column_names_remapping={
'Survived_1_PREDICTION': None, # discard column
'Survived_0_PREDICTION': None, # discard column
'Survived_PREDICTION': 'predicted', # rename column
'THRESHOLD': None, # discard column
'POSITIVE_CLASS': None, # discard column
},
)
Any column mapped to None
will be discarded. Any column mapped to a string will be renamed.
Any column not mentioned will be kept in the output untouched.
So your DataFrame will now contain the following extra columns:
predicted
prediction_status
Refer to the documentation for BatchPredictionJob.score
for the full range of available options.
Batch Prediction Job Definitions¶
To submit a working Batch Prediction job, you must supply a variety of elements to the datarobot.models.BatchPredictionJob.score()
request payload depending on what type of prediction is required. Additionally, you must consider the type of intake
and output adapters used for a given job.
Every time a new Batch Prediction is created, the same amount of information must be stored somewhere outside of DataRobot and re-submitted every time.
For example, a request could look like:
import datarobot as dr
deployment_id = "5dc5b1015e6e762a6241f9aa"
job = dr.BatchPredictionJob.score(
deployment_id,
intake_settings={
"type": "s3",
"url": "s3://bucket/container/file.csv",
"credential_id": "5dc5b1015e6e762a6241f9bb"
},
output_settings={
"type": "s3",
"url": "s3://bucket/container/output.csv",
"credential_id": "5dc5b1015e6e762a6241f9bb"
},
)
job.wait_for_completion()
with open("./predicted.csv", "wb") as f:
job.download(f)
Job Definitions¶
If your use case requires the same, or close to the same, type of prediction to be done multiple times, you can choose to create a Job Definition of the Batch Prediction job and store this inside DataRobot for future use.
The method for creating job definitions is identical to the existing datarobot.models.BatchPredictionJob.score()
method,
except for the addition of a enabled
, name
and schedule
parameter: datarobot.models.BatchPredictionJobDefinition.create()
>>> import datarobot as dr
>>> job_spec = {
... "num_concurrent": 4,
... "deployment_id": "5dc5b1015e6e762a6241f9aa",
... "intake_settings": {
... "url": "s3://foobar/123",
... "type": "s3",
... "format": "csv",
... "credential_id": "5dc5b1015e6e762a6241f9bb"
... },
... "output_settings": {
... "url": "s3://foobar/123",
... "type": "s3",
... "format": "csv",
... "credential_id": "5dc5b1015e6e762a6241f9bb"
... },
...}
>>> definition = BatchPredictionJobDefinition.create(
... enabled=False,
... batch_prediction_job=job_spec,
... name="some_definition_name",
... schedule=None
... )
>>> definition
BatchPredictionJobDefinition(foobar)
Note
The name
parameter must be unique across your organization. If you attempt to create multiple definitions
with the same name, the request will fail. If you wish to free up a name, you must first datarobot.models.BatchPredictionJobDefinition.delete()
the existing definition before creating this one. Alternatively you can just datarobot.models.BatchPredictionJobDefinition.update()
the existing definition with a new name.
Executing a job definition¶
Manual job execution¶
To submit a stored job definition for scoring, you can either do so on a scheduled basis, described
below, or manually submit the definition ID using datarobot.models.BatchPredictionJobDefinition.run_once()
,
as such:
>>> import datarobot as dr
>>> definition = dr.BatchPredictionJobDefinition.get("5dc5b1015e6e762a6241f9aa")
>>> job = definition.run_once()
>>> job.wait_for_completion()
Scheduled job execution¶
A Scheduled Batch Prediction job works just like a regular Batch Prediction job, except DataRobot handles the execution of the job.
In order to schedule the execution of a Batch Prediction job, a definition must first be created, using
datarobot.models.BatchPredictionJobDefinition.create()
, or updated, using
datarobot.models.BatchPredictionJobDefinition.update()
, where enabled
is set to True
and a schedule
payload is provided.
Alternatively, you can use a short-hand version with datarobot.models.BatchPredictionJobDefinition.run_on_schedule()
as such:
>>> import datarobot as dr
>>> schedule = {
... "day_of_week": [
... 1
... ],
... "month": [
... "*"
... ],
... "hour": [
... 16
... ],
... "minute": [
... 0
... ],
... "day_of_month": [
... 1
... ]
...}
>>> definition = dr.BatchPredictionJob.get("5dc5b1015e6e762a6241f9aa")
>>> job = definition.run_on_schedule(schedule)
If the created job was not enabled previously, this method will also enable it.
The Schedule
payload¶
The schedule
payload defines at what intervals the job should run, which can be combined in various ways to construct
complex scheduling terms if needed. In all of the elements in the objects, you can supply either an asterisk ["*"]
denoting “every” time denomination or an array of integers (e.g. [1, 2, 3]
) to define a specific interval.
Key |
Possible values |
Example |
Description |
---|---|---|---|
minute |
|
|
The job will run at these minute values for every hour of the day. |
hour |
|
|
The hour(s) of the day that the job will run. |
month |
|
|
Strings, either 3-letter abbreviations or the full name of the month, can be used interchangeably (e.g., “jan” or “october”). Months that are not compatible with |
day_of_week |
|
|
The day(s) of the week that the job will run. Strings, either 3-letter abbreviations or the full name of the day, can be used interchangeably (e.g., “sunday”, “Sunday”, “sun”, or “Sun”, all map to NOTE: This field is additive with |
day_of_month |
|
|
The date(s) of the month that the job will run. Allowed values are either NOTE: This field is additive with Invalid dates such as February 31st are ignored. |
Disabling a scheduled job¶
Job definitions are only be executed by the scheduler if enabled
is set to True
. If you have a job definition
that was previously running as a scheduled job, but should now be stopped, simply
datarobot.models.BatchPredictionJobDefinition.delete()
to remove it completely, or datarobot.models.BatchPredictionJobDefinition.update()
it with enabled=False
if you want to keep the definition, but stop the scheduled job from executing at intervals.
If a job is currently running, this will finish execution regardless.
>>> import datarobot as dr
>>> definition = dr.BatchPredictionJobDefinition.get("5dc5b1015e6e762a6241f9aa")
>>> definition.delete()