Writing new one off jobs using map reduce
Note: If you’re writing a new one-off job that needs to be run in production, please see the “Running Jobs in Production” wiki page in order to understand how the deployment procedure works.
Follow the steps below to write a new one-off job:
Add a new class to the
<domain-name>_jobs_one_off.py
file (Create one if it doesn’t exist.)Find the model which you want to map over.
Add
entity_classes_to_map_over
@classmethod
which returns a list of the model you want to map over.Create a static method
map
in the newly created one-off job class which will receive a model as an arg.Write the job you want to perform on a model in the
map
static method,yield
a tuple ofkey and value
to log any information you want from the job like success or failure log, etc.Add a
reduce
static method which will receive theyield
produced by themap
class after calling this method on a model. Anything you yield in the method will be logged as a report from the one-off job. (See existing one-off jobs for more info.)Add your one-off job class in the ONE_OFF_JOB_MANAGER list of jobs_registry.py file.
Make sure to test the one-off job manually. (You can follow these steps to test a one-off job through the admin page.)
If your one-off job is too slow or running on small number of model types (classes) you can try to increase the number of shards, for that you need to override the
enqueue
@classmethod
and set theshard_count
to some higher number (default is 8). Increasing the number too much can also break the one-off job, so you need to test it even after changing just theshard_count
.
Example of an one-off job:
class InteractionAuditOneOffJob(jobs.BaseMapReduceOneOffJobManager):
"""Job that produces a list of (exploration, state) pairs, grouped by the
interaction they use.
"""
@classmethod
def enqueue(cls, job_id, additional_job_params=None):
super(InteractionAuditOneOffJob, cls).enqueue(
job_id, shard_count=64)
@classmethod
def entity_classes_to_map_over(cls):
return [exp_models.ExplorationModel]
@staticmethod
def map(item):
if item.deleted:
return
exploration = exp_fetchers.get_exploration_from_model(item)
for state_name, state in exploration.states.items():
exp_and_state_key = '%s %s' % (item.id, state_name)
yield (state.interaction.id, exp_and_state_key)
@staticmethod
def reduce(key, values):
yield (key, values)