Stream Analytics is probably one of consisted parts of a typical IoT solution. When huge number of devices produce telemetry data, some component will have to analyze them. In a typical IoT solution the reference architecture considers almost always ingest of data in some kind of temporary storage. AT the moment of writing of this article, Azure Stream Analytics supports 3 type of storage, which can be used for stream processing: Blob Storage, Event Hub and IoTHub. This is shown at the picture below.
Devices sending telemetry data (Events) to any of 3 named storages.
Stream Analytics provides an option to register so called Data-Input –Source.
Once the stream is defined, we can start analyzing it. As shown at the first picture Azure Stream Analytics (later ASA) consists of 3 components. First one (on left) is input data stream as described on previous picture. You must define a single data source which will be used for analysis. As next we have to define an output. This is the port where result data of analyzing will be streamed out. Currently there are more possible outputs as inputs, as shown at the next picture. Notice, that sources used for input as almost all already know technologies.
If we for example use SQL, then we can do with that data anything possible in context of SQL server. Notice, that EventHub and Blob Storage can be used as output too. This is because, output of one ASA stream can be an input for next ASA job. By using this technic, we can build chain of parallel processing ASA jobs. Interestingly IoTHub is not included in the is of outputs. This is reasonable, because IoTHub is built on top of EventHub. When processing stream ASA doe not know anything about devices and device registry. In other words ASA dos not need IoTHub instead of event hub. Sending data into EventHub or IoTHub does not make any difference for ASA. This is why IoTHub is not in there.
As last component of an ASA job is ‘QUERY’. Simplified, ASA job is defined by Input Stream, Query and Output Stream, where data flow is strictly defined:
Input Stream –> Query –> Output Stream |
The actual processing magic should be implemented in the query. Following URL shows query reference documentation and common query patterns.
Copy Stream
Let’s start with first example. Following stream does not perform any analytics. It simply copy date from input stream to output stream. You may ask yourself why is this needed at all if it does not do anything? Such copy statements as helpful to copy data to appropriate output. For example Microsoft provide APIs for almost all hardware platforms to ingest data in the event hub. Statement below would copy that data in for example SQL database. That means you can use ASA to to simply stream data to the right destination.
select * INTO myoutputstream from myinputstream; |
Analyzing stream with Sliding Window
As next, let’s do some analytics. Following statement is calculating average temperature of device in sliding window of 5 seconds. That means, we are not calculating average temperature on some historical data as typical BI solution would do it.
SELECT DateAdd(second,-5,System.TimeStamp) as WinStartTime, system.TimeStamp as WinEndTime, DeviceId, Avg(Temperature) as AvgTemperature, Count(*) as EventCount INTO myoutputstream FROM myinputstream GROUP BY TumblingWindow(second, 5), DeviceId; |
If we pass result of calculation to Power BI output, we can build classical Power BI solution on top of streaming data result as
following picture shows.
In this example we used sliding window of 5 seconds as an example, but we could use any other meaningful value. Please note, that, when working with streams, we do not necessary need to store all data. However be careful when making this decision. In most cases it is required to keep original data for later processing, for the case that you find out new use cases, which can leverage machine learning. Processing streams has advantage of processing of hot data, without of relation to any historical data. More over ASA provide parallel job processing of the query, which is not possible with common BI approach.
Copy and Analyse
The good technic is to copy all data before processing for later use and then to process data on the hot stream. Following example is combination of previous two examples.
select * INTO rawdata from myinputstream; SELECT DateAdd(second,-5,System.TimeStamp) as WinStartTime, system.TimeStamp as WinEndTime, DeviceId, Avg(Temperature) as AvgTemperature, Count(*) as EventCount INTO averagetempoutput FROM myinputstream GROUP BY TumblingWindow(second, 5), DeviceId; |
As you see in example above, queries can be processes sequentially after each other. Note that parallel processing is still in place.
Cold and Hot Path analysis
ASA provides very high flexibility, when it comes to changing of use cases. Originally we started with copying of data (first statement in following example.) This is usually called cold-path.
It is a copy of raw data typically used for big data processing or machine learning. Then we figured out, that we can start analyzing of critical (2) events only. At same later time we decided to focus on specific range (3) of devices , which might produce critical events.
And finally (4) we define critical devices (hot path), when in 120 seconds more than 3 alerts are noticed.
WITH CriticalEvents AS ( SELECT * FROM inputalldevices WHERE Temperature > 98 )
// (1) Copy all data SELECT * INTO rawdata FROM inputalldevices // (2) Copy only critical events SELECT * INTO allCritical FROM CriticalEvents
// (3) Copy only critical events for devices in id range 200-500. SELECT * INTO criticalRetail200500 FROM CriticalEvents WHERE DeviceId < 500 AND DeviceId > 200 // (4) Look for 3 critical events in 120 second thumbling window SELECT DateAdd(second,-5,System.TimeStamp) as WinStartTime, system.TimeStamp as WinEndTime, DeviceId, MAX(Temperature) as MaxTemperature, Count(*) as EventCount INTO hotevents FROM CriticalEvents GROUP BY TumblingWindow(second, 120), DeviceId HAVING [EventCount] >=3
|
Please note, whatever our statement looks like and when ever we need to change it, it is simply change of the query statement. Right now, changing of statement requires stopping of event stream processing. If you do not want to stop processing, you will have to create a new ASA job, with the input, which connects to different consumer group of event hub, assuming that event hub is used for processing of event, which is usual case.
Posted
Dec 08 2015, 08:37 AM
by
Damir Dobric