1.2. gearman.worker
— Gearman worker¶
-
class
gearman.worker.
GearmanWorker
(host_list=None)[source]¶ GearmanWorker :: Interface to accept jobs from a Gearman server
1.2.1. Job processing¶
-
GearmanWorker.
set_client_id
(client_id)[source]¶ Notify the server that we should be identified as this client ID
-
GearmanWorker.
register_task
(task, callback_function)[source]¶ Register a function with this worker
- def function_callback(calling_gearman_worker, current_job):
- return current_job.data
-
GearmanWorker.
work
(poll_timeout=60.0)[source]¶ Loop indefinitely, complete tasks from all connections.
Setting up a basic worker that reverses a given byte-string:
gm_worker = gearman.GearmanWorker(['localhost:4730'])
# See gearman/job.py to see attributes on the GearmanJob
# Send back a reversed version of the 'data' string
def task_listener_reverse(gearman_worker, gearman_job):
return reversed(gearman_job.data)
# gm_worker.set_client_id is optional
gm_worker.set_client_id('your_worker_client_id_name')
gm_worker.register_task('reverse', task_listener_reverse)
# Enter our work loop and call gm_worker.after_poll() after each time we timeout/see socket activity
gm_worker.work()
1.2.2. Sending in-flight job updates¶
-
GearmanWorker.
send_job_data
(current_job, data, poll_timeout=None)[source]¶ Send a Gearman JOB_DATA update for an inflight job
-
GearmanWorker.
send_job_status
(current_job, numerator, denominator, poll_timeout=None)[source]¶ Send a Gearman JOB_STATUS update for an inflight job
-
GearmanWorker.
send_job_warning
(current_job, data, poll_timeout=None)[source]¶ Send a Gearman JOB_WARNING update for an inflight job
Callback function sending back inflight job updates:
gm_worker = gearman.GearmanWorker(['localhost:4730'])
# See gearman/job.py to see attributes on the GearmanJob
# Send back a reversed version of the 'data' string through WORK_DATA instead of WORK_COMPLETE
def task_listener_reverse_inflight(gearman_worker, gearman_job):
reversed_data = reversed(gearman_job.data)
total_chars = len(reversed_data)
for idx, character in enumerate(reversed_data):
gearman_worker.send_job_data(gearman_job, str(character))
gearman_worker.send_job_status(gearman_job, idx + 1, total_chars)
return None
# gm_worker.set_client_id is optional
gm_worker.register_task('reverse', task_listener_reverse_inflight)
# Enter our work loop and call gm_worker.after_poll() after each time we timeout/see socket activity
gm_worker.work()
1.2.3. Extending the worker¶
-
GearmanWorker.
data_encoder
= <class 'gearman.connection_manager.NoopEncoder'>¶
-
GearmanWorker.
after_poll
(any_activity)[source]¶ Polling callback to notify any outside listeners whats going on with the GearmanWorker.
Return True to continue polling, False to exit the work loop
Send/receive Python objects and do work between polls:
# By default, GearmanWorker's can only send off byte-strings
# If we want to be able to send out Python objects, we can specify a data encoder
# This will automatically convert byte strings <-> Python objects for ALL commands that have the 'data' field
#
# See http://gearman.org/index.php?id=protocol for Worker commands that send/receive 'opaque data'
#
import json # Or similarly styled library
class JSONDataEncoder(gearman.DataEncoder):
@classmethod
def encode(cls, encodable_object):
return json.dumps(encodable_object)
@classmethod
def decode(cls, decodable_string):
return json.loads(decodable_string)
class DBRollbackJSONWorker(gearman.GearmanWorker):
data_encoder = JSONDataEncoder
def after_poll(self, any_activity):
# After every select loop, let's rollback our DB connections just to be safe
continue_working = True
self.db_connections.rollback()
return continue_working