How to parallelize an azure worker role? -
i have got worker role running in azure.
this worker processes queue in there large number of integers. each integer have processings quite long (from 1 second 10 minutes according integer).
as quite time consuming, these processings in parallel. unfortunately, parallelization seems not efficient when test queue of 400 integers.
here implementation :
public class workerrole : roleentrypoint { private readonly cancellationtokensource cancellationtokensource = new cancellationtokensource(); private readonly manualresetevent runcompleteevent = new manualresetevent(false); private readonly manager _manager = manager.instance; private static readonly logmanager logger = logmanager.instance; public override void run() { logger.info("worker running"); try { this.runasync(this.cancellationtokensource.token).wait(); } catch (exception e) { logger.error(e, 0, "error run worker: " + e); } { this.runcompleteevent.set(); } } public override bool onstart() { bool result = base.onstart(); logger.info("worker has been started"); return result; } public override void onstop() { logger.info("worker stopping"); this.cancellationtokensource.cancel(); this.runcompleteevent.waitone(); base.onstop(); logger.info("worker has stopped"); } private async task runasync(cancellationtoken cancellationtoken) { while (!cancellationtoken.iscancellationrequested) { try { _manager.processqueue(); } catch (exception e) { logger.error(e, 0, "error runasync worker: " + e); } } await task.delay(1000, cancellationtoken); } } }
and implementation of processqueue:
public void processqueue() { try { _queue.fetchattributes(); int? cachedmessagecount = _queue.approximatemessagecount; if (cachedmessagecount != null && cachedmessagecount > 0) { var listentries = new list<cloudqueuemessage>(); listentries.addrange(_queue.getmessages(max_entries)); parallel.foreach(listentries, processentry); } } catch (exception e) { logger.error(e, 0, "error processqueue: " + e); } }
and processentry
private void processentry(cloudqueuemessage entry) { try { int id = convert.toint32(entry.asstring); service.getdata(id); _queue.deletemessage(entry); } catch (exception e) { _queueerror.addmessage(entry); _queue.deletemessage(entry); logger.error(e, 0, "error processentry: " + e); } }
in processqueue function, try different values of max_entries: first =20 , =2. seems slower max_entries=20, whatever value of max_entries is, seems quite slow.
my vm a2 medium.
i don't know if parallelization correctly ; maybe problem comes worker (which may hard have in parallel).
you haven't mentioned azure messaging queuing technology using, tasks want process multiple messages in parallel tend use message pump pattern on service bus queues , subscriptions, leveraging onmessage() method available on both service bus queue , subscription clients:
- queueclient onmessage() - https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.queueclient.onmessage.aspx
- subscriptionclient onmessage() - https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.subscriptionclient.onmessage.aspx
- an overview of how stuff works :-) - http://fabriccontroller.net/blog/posts/introducing-the-event-driven-message-programming-model-for-the-windows-azure-service-bus/
from msdn:
when calling onmessage(), client starts internal message pump polls queue or subscription. message pump consists of infinite loop issues receive() call. if call times out, issues next receive() call.
this pattern allows use delegate (or anonymous function in preferred case) handles receipt of brokered message instance on separate thread on waworkerhost process. in fact, increase level of throughput, can specify number of threads message pump should provide, thereby allowing receive , process 2, 4, 8 messages queue in parallel. can additionally tell message pump automagically mark message complete when delegate has finished processing message. both thread count , autocomplete instructions passed in onmessageoptions parameter on overloaded method.
public override void run() { var onmessageoptions = new onmessageoptions() { autocomplete = true, // message-pump call complete on messages after callback has completed processing. maxconcurrentcalls = 2 // max number of threads message-pump can spawn process messages. }; sbqueueclient.onmessage((brokeredmessage) => { // process brokered message instance here }, onmessageoptions); runasync(_cancellationtokensource.token).wait(); }
you can still leverage runasync() method perform additional tasks on main worker role thread if required.
finally, recommend @ scaling worker role instances out minimum of 2 (for fault tolerance , redundancy) increase overall throughput. have seen multiple production deployments of pattern, onmessage() performs when multiple worker role instances running.
Comments
Post a Comment