This post should give you a quick overview of new features of Service Bus. Note that I didn’t describe the MessageBrowse Feature, because it is already well described by other SB team colleagues.
MessagePump
When receiving messages from queue, you have been responsible to implement some kind of listener which will receive messages and dispatch them to some message processor.
With Service Bus 2.0 you can give this responsibility away. SB v2 provides an event driven message receiver based on message pump implementation.
Following code snippet shows how easy you can receive messages now.
client.OnMessage((msg) => { Console.WriteLine("ThreadId: {0}, Message: {1}", Thread.CurrentThread.ManagedThreadId, msg.GetBody<string>()); if (cnt == 10) // because we expect 10 messages. return; }, new OnMessageOptions() { AutoComplete = true, MaxConcurrentCalls = 5 } ); |
OnMessageOptions can be used to specified the number of concurrently processing messages. If you want to process one by one message ( one thread only) specify value one here.
Independent on his value messages are always processed on another thread.
Last but not least in OnMessageOptions you set AutoComplete, if you want that message is completed after successful execution of callback.
Suspend and Resume Queue
The queues and topics can be now suspended and resumed. This can be achieved by manipulating the status of the queue:
// Summary: // Enumerates the possible values for the status of a messaging entity. [DataContract(Name = "EntityStatus", Namespace = "http://schemas.microsoft.com/netservices/2010/10/servicebus/connect")] public enum EntityStatus { // Summary: // The status of the messaging entity is active. [EnumMember] Active = 0, // // Summary: // The status of the messaging entity is disabled. [EnumMember] Disabled = 1, // // Summary: // Resuming the previous status of the messaging entity. [EnumMember] Restoring = 2, // // Summary: // The sending status of the messaging entity is disabled. [EnumMember] SendDisabled = 3, // // Summary: // The receiving status of the messaging entity is disabled. [EnumMember] ReceiveDisabled = 4, } |
Following sample illustrates how this work:
/// <summary> /// Suspend and Resume sample. /// </summary> private static void suspendResumeSamnple() { string qName = "hello/msgpump"; var qDesc = prepareQueue(qName); var client = QueueClient.CreateFromConnectionString(m_ConnStr, qName, ReceiveMode.PeekLock); Console.WriteLine("Sending messages..."); for (int i = 0; i < 10; i++) { client.Send(new BrokeredMessage("i = " + i.ToString() + ", payload = " + new Random().Next().ToString())); } int cnt = 10; client.OnMessage((msg) => { Console.WriteLine("ThreadId: {0}, Message: {1}", Thread.CurrentThread.ManagedThreadId, msg.GetBody<string>()); Thread.Sleep(1000); if (cnt == 10) return; }, new OnMessageOptions() { AutoComplete = true } ); // Wait to receive few messages. Thread.Sleep(2000); NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(m_ConnStr); // Suspend Queue qDesc.Status = EntityStatus.ReceiveDisabled; namespaceManager.UpdateQueue(qDesc); Console.WriteLine("Receive disabled!"); Thread.Sleep(25000);
// Resume Queue qDesc.Status = EntityStatus.Active; namespaceManager.UpdateQueue(qDesc); Console.WriteLine("Receive enabled..."); Console.ReadLine(); }
|
And finally, the picture below shows that receiving of messages is stopped for a while. Note that message pump might already has received a message in the moment of
disabling of receive operation. See blue underlined message. To resume the receiving (or any other suspended) operation just set the queue state to active again.
AutoDelete Queue on Idle
This features enables removal of queues which are not used for a while.
Following sample illustrate this. After sleep time (higher than specified delete on idle time) you can check if the queue exists.
If all works fine the queue will not exist. Note that specified time is not very exact. Do not expect that this time is exact like Swiss Watch.
The queue will not be deleted one second after specified time. I tried with idle of 5min and sleep of 6 and queue has not been deleted.
private static void autodeletOnIdle() { var qDesc = prepareQueue("hello/autodelete"); qDesc.AutoDeleteOnIdle = TimeSpan.FromMinutes(5); NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(m_ConnStr);
namespaceManager.UpdateQueue(qDesc); Thread.Sleep(6 * 60000);
if (namespaceManager.QueueExists("hello/autodelete")) { Console.WriteLine("Queue has been deleted"); } else { Console.WriteLine("Queue has NOT been deleted!"); } Console.ReadLine(); } |
The value supplied for TimeSpan of AutoDelete must be between 00:05:00 and 10675199.02:48:05.4775807.
All previously created queue (before version of SB 2.0) have the same default value like newly queues created queues. This value is the maximum allowed value 10675199.02:48:05.4775807, which is
more details about this in Haishi’s post.
Authorization Rules
One of really interesting improvements in V2 is support for ShareAccessSignature. The problem we usually have a related to sharing of keys and maintaining service identities in Access Control Service.
This is a task which I and most of you out there do not like. Now you can set permission on the queue/topic directly. The authorization is defined as list of rules and one rule would look like:
authorizationRule = ( KeyName, KeyValue, [AccessRule1, .., AccessRuleN]);
Programmatically authorization rule can be created as follows:
new SharedAccessAuthorizationRule("demouser1_can_Send", key1, new AccessRights[] { AccessRights.Send }));
The first argument is simply the name of the rule. For example: Rule1, ReadRule or transferAccountQueue.
The key must be 44 bytes long. I guess this is 256 bits base64 encoded. Otherwise you will get following error:
The remote server returned an error: (400) Bad Request. A SharedAccessAuthorizationRule only supports keys of size 44 bytes..
Anyhow, the be sure you can use a helper method to create the key:
string key1 = SharedAccessAuthorizationRule.GenerateRandomKey();
Then you have to append the list of rights:
// Summary: // Specifies the possible access rights for a user. [DataContract(Name = "AccessRights", Namespace = "http://schemas.microsoft.com/netservices/2010/10/servicebus/connect")] public enum AccessRights { // Summary: // The access right is Manage. [EnumMember] Manage = 0, // // Summary: // The access right is Send. [EnumMember] Send = 1, // // Summary: // The access right is Listen. [EnumMember] Listen = 2, } |
After you setup permissions, participants can do following to access the entity:
var factory1 = MessagingFactory.Create(new Uri("sb://xyyyyy.servicebus.windows.net"),
TokenProvider.CreateSharedAccessSignatureTokenProvider("your rule key name", yourkey));
var clientDemoUser1 = factory1.CreateQueueClient(qName, ReceiveMode.PeekLock);
As long the keyName and key match all will work fine. Note that in this sample we didn’t use any credential from Access Control Service.
Following full sample illustrates how to deal with Shared Access Signatures.
private static void authorizationRulesSample() { NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(m_ConnStr); string qName = "hello/rule1q"; string key1 = SharedAccessAuthorizationRule.GenerateRandomKey(); string key2 = SharedAccessAuthorizationRule.GenerateRandomKey(); if (namespaceManager.QueueExists(qName)) namespaceManager.DeleteQueue(qName); QueueDescription qDesc1 = new QueueDescription(qName); qDesc1.Authorization.Add(new SharedAccessAuthorizationRule("demouser1_can_Send", key1, new AccessRights[] { AccessRights.Send })); qDesc1.Authorization.Add(new SharedAccessAuthorizationRule("demouser2_can_Listen", key2, new AccessRights[] { AccessRights.Listen })); //qDesc1.Authorization.Add(new SharedAccessAuthorizationRule("how_to_set_admin_perm", key3, new AccessRights[] { AccessRights.Manage, AccessRights.Listen, AccessRights.Send })); namespaceManager.CreateQueue(qDesc1); var factory1 = MessagingFactory.Create(new Uri("sb://frankfurt.servicebus.windows.net"), TokenProvider.CreateSharedAccessSignatureTokenProvider("demouser1_can_Send", key1)); var factory2 = MessagingFactory.Create(new Uri("sb://frankfurt.servicebus.windows.net"), TokenProvider.CreateSharedAccessSignatureTokenProvider("demouser2_can_Listen", key2)); var clientDemoUser1 = factory1.CreateQueueClient(qName, ReceiveMode.PeekLock); var clientDemoUser2 = factory2.CreateQueueClient(qName, ReceiveMode.PeekLock); clientDemoUser1.Send(new BrokeredMessage("abc")); try { // This one has not a right to send to queue. clientDemoUser2.Send(new BrokeredMessage("abc")); } catch (UnauthorizedAccessException) { } try { // This one has not a right to Listen on queue. var msg = clientDemoUser1.Receive(); } catch (UnauthorizedAccessException) { } // This one can listen. var msg2 = clientDemoUser2.Receive(); Console.ReadLine(); } |
More about this topic: http://haishibai.blogspot.de/2013/01/new-features-in-service-bus-preview_29.html
Task Based Api
Most of ServiceBus methods provides now async/await support. More about this feature:
http://blogs.msdn.com/b/windowsazure/archive/2013/04/11/task-based-apis-for-service-bus.aspx
(image copied from msdn – see URL above)
Video about SB v2 features:
http://channel9.msdn.com/Blogs/Subscribe/Whats-new-in-the-Service-Bus-NET-SDK-20
Sample for this solution:
http://code.msdn.microsoft.com/ServiceBus-V2-Features-6b29f2ca
Posted
May 01 2013, 11:30 AM
by
Damir Dobric