Once the data is stored in the IotHub it can remain there for max 7 days. As long the data is in the hub it can be read by various readers. Commonly data is read by using Azure Stream Analytics. However, sometimes it is required to implement a native reader. There are two approaches when reading events from IotHub:
- DirectConsumers and
- EventHubProcessor.
DirectConsumer is used, when events can be read at lower speed. That typically means a single process is reading from multiple partitions. For many scenarios this might be satisfying.
EventHubConsumer is used when multiple consumers need to read events at high speed simultaneously running on different processes at different nodes. It is a load-balanced event reader solution built as distributed system.
In this post I will describe how to implement direct consumer. To receive the data from IotHub or EventHub we will use EventHubClient. That API can be used to receive data from both EventHub and IotHub without any code change.
EventHubClient eventHubClient =
EventHubClient.CreateFromConnectionString(m_ConnStr, endpointPath);
Where endpointPath is typically: "messages/events". By using of that endpoint it is possible to receive events from both EventHub endpoint and IotHub endpoint.
In this example, a single consumer will connect to all partitions of IotHub (EventHub) and receive events. Number of partitions can be obtained with following line of code:
var partitions= eventHubClient.GetRuntimeInformation().PartitionIds;
This is important, because we can explicitly specify from which partition event should be read. In a case of high-performance requirement, the best choice would be to connect a single reader to single partition. Connecting multiple readers to same partition is allowed, but such “competing consumer” pattern would not provide best performance. You should avoid this scenario.
Receiving of events from a partition is typically done by using of EventHubReceiver:
var eventHubReceiver = eventHubClient.GetConsumerGroup("mygroup").CreateReceiver(partition, DateTime.Now);
EventData eventData = await eventHubReceiver.ReceiveAsync();
When creating receiver, you can optionally specify the consumer group name. By invoking GetDefaultConsumerGroup instead of GetConsumerGroup, you can receive events from $Default consumer group.
Additionally, by creating of receiver, you need to provide a timestamp offset. This is the time point where to start reading of events. In the sample above we used DateTime.Now. This would start receiving all upcoming events.
A single received event I represented as EventData class. Following picture shows one event:
Properties which you see above are system properties. All other custom properties are typically stored in Property eventData.Properties. In this case device didn’t specify any custom property.
The body of the event (cannot be seen in debugger) can be retrieved by using following code:
string data = Encoding.UTF8.GetString(eventData.GetBytes();
Further you might want to access properties. Following code snippet shows how to do this:
static async Task readEvent(EventData data) { string data = Encoding.UTF8.GetString(eventData.GetBytes();
var sensorEvent = JsonConvert.DeserializeObject<dynamic>(data);
Console.WriteLine($"T={sensorEvent.Temperature} Celsius, I= {sensorEvent.Current} A, L={sensorEvent.Location}"); } |
Putting all together:
class Program { /// <summary> /// Goto IotHub portal and copy Shared Access Policy with name 'service'. /// </summary> static string m_ConnStr = "**"; static string m_HubToDeviceEndpoint = "messages/events"; static EventHubClient eventHubClient; static void Main(string[] args) { eventHubClient = EventHubClient. CreateFromConnectionString(m_ConnStr, m_HubToDeviceEndpoint); var partitions= eventHubClient.GetRuntimeInformation().PartitionIds; foreach (string partition in partitions) { ReceiveMessagesFromDeviceAsync(partition); } Console.ReadLine(); } private async static Task ReceiveMessagesFromDeviceAsync(string partition) { var eventHubReceiver = eventHubClient.GetConsumerGroup("mygroup"). CreateReceiver(partition, DateTime.Now); while (true) { EventData eventData = await eventHubReceiver.ReceiveAsync(); if (eventData == null) continue; string data = Encoding.UTF8.GetString(eventData.GetBytes()); Console.WriteLine( string.Format("Message received. Partition: {0} Data: '{1}'", partition, data)); readProperties(data); } } private static void readProperties(string data) { var sensorEvent = JsonConvert.DeserializeObject<dynamic>(data); Console.WriteLine($"T={sensorEvent.Temperature} Celsius, I= {sensorEvent.Current} A, L={sensorEvent.Location}"); } }
|
Posted
Jun 02 2016, 06:51 AM
by
Damir Dobric