Бессерверная работа с IoT и функциями Azure

Tags: IoT, Azure, Serverless

Концепция интернет-вещей фигурирует почти в каждой статье о будущих перспективах ИТ. И это воистину увлекательная тема, ее сценарии безграничны, а случаи использования хорошо прощупываются, кажется, что мы могли бы реализовать их много лет назад. Но давайте отбросим философские понятия; мы хотели бы продемонстрировать технологии для решения IoT на основе бессерверных предложений Azure. Давайте не будем медлить.

Интернет вещей (IoT) - может быть реализован множеством способов. Сегодня мы расскажем об использовании облачных функций Microsoft Azure, в частности использовании приложений Azure Function, Azure Service Bus и Azure Storage - с очень небольшим количеством приложений Azure Logic.Так что здесь будет много об Azure, в частности - о бессерверных приложений. Если вы еще не знакомы с термином "бессерверный", то мы даем ему такое определение, как использование ресурсов без локального содержания. Конечно, мы все хорошо знаем, как ни один запрос на определенный ресурс никогда не будет «без сервера»; до тех пор, пока в глубине многоуровневого стека есть какой-то физический аппаратный элемент, которому суждено обеспечить ответ на запрос. Таким образом, термин «бессерверный» не следует воспринимать буквально, в отличие от предоставления прозвища для загрузки и запуска ресурсов и ответа на запросы, не заботясь о физических ограничениях оборудования. Так что вам не нужно, в честь фразы термина, настроить фактический сервер для ответа на запросы, вместо этого вы можете просто развернуть сам механизм ответа на облако Azure и вместо того, чтобы ориентироваться на аппаратное метрики - сколько процессоров, будь то развертывание балансировки нагрузки, такие соображения - вы можете сосредоточиться на том, как вы хотите, чтобы механизм ответа работал, например, как масштабировать. Таким образом, вам не нужно настраивать фактический сервер для ответа на запросы, вместо этого вы можете просто развернуть сам механизм ответа на облако Azure и вместо того, чтобы ориентироваться на аппаратные метрики - число процессоров, как развернуть балансировку нагрузки и подобные соображения - вы можете сосредоточиться на том, как вы хотите, чтобы работал механизм ответа, например, как масштабироваться.

Но мы отвлеклись; вы, вероятно, полностью осознаете этот термин и его последствия. Однако для этой статьи серверный сервер также подразумевает рассмотрение настройки: наши IoT-устройства будут отправлять и получать данные из вышеупомянутых облачных сервисов. Это, в отличие от разрешения запросов, обрабатывается центральным брокером, таким как реализация MQTT, например. Это целенаправленный выбор; эта конкретная реализация архитектуры IoT подходит для небольших проектов, где требования могут быть более легко исправлены и, таким образом, мы можем сделать эти предположения, которые мы в противном случае абстрагировали бы. Кроме того, дальнейшие ограничения этой конкретной реализации, с последующим внесением поправок, связаны с отсутствием приоритетов и обеспечения безопасности устройств, а также совершенно сложными темами. Не ошибитесь, эта реализация IoT будет работать нормально и иметь крайне ограниченный финансовый эффект. Если вы имеете дело с десятками тысяч различных датчиков и устройств, во множестве местоположений, вы, скорее всего, найдете лучшее использование набора выделенного портфеля Microsoft IoT, и мы рекомендуем вам далее изучать это направление. 

Утомительные ограничения в сторону, давайте начинать! Пожалуйста, обратите внимание на приведенную ниже диаграмму архитектуры, когда мы проходим через простой случай использования, рассматривая технологический выбор.
Пример будет иметь температурный датчик, который отправляет считывание температуры в наш облачный сервер, при необходимости, в свою очередь, для получения команды, предназначенной для конкретного устройства.
Устройство распределяет свое считывание на Azure Function (1) посредством HTTPS REST-вызова. Функция отвечает за хранение необработанных данных в очереди Azure Storage (2). Функции Azure чрезвычайно дешевы для выполнения - у вас могут быть миллионы выполнений по очень низкой цене, что идеально подходит для сети датчиков, которые часто отправляют данные. Подобный финансовый аргумент применяется к учетной записи Azure Storage, которую мы будем использовать для хранения данных датчиков. Хранение очереди идеально подходит для наших целей; оно предназначено для хранения рабочих элементов, которые будут обработаны на каком-то более позднем этапе, после чего данные будут просто удалены из очереди - и мы также можем указать выделенный жизненный цикл для данных, если это необходимо. Но, самое главное, мы можем использовать специальный триггер приложения приложения Azure (3), который активирует новые элементы в очереди. В этом конкретном случае он извлекает данные из очереди и, исходя из необработанных данных датчика, создает более конкретную и обогащенную модель данных. Мы, конечно, могли бы сделать это в первой функции Azure, но важна абстракция, поскольку она позволяет нам вводить здесь бизнес-логику, если это понадобится позже. В настоящее время единственной логикой является извлечение информации сенсорного устройства из таблицы хранения Azure (4) и добавление бит этой информации в сообщение устройства, которое затем переходит в таблицу хранения Azure, которая содержит данные датчика (5) - но позже мы могли бы добавить туда аутентификацию датчика, и по крайней мере теперь у нас есть абстракция, в которой можно реализовать это в дальнейшем. Функции Azure хорошо масштабируются; если ваша очередь становится переполненной и вы используете функции по так называемому тарифному плану, например, функции будут просто увеличивать количество экземпляров для обработки нагрузки. Это действительно говорит о сути термина "беcсерверный".
Таким образом, вторая функция Azure создает более значимую часть данных (6) (например, мы добавляем информацию о конкретном типе датчика) и отправляет ее в раздел Azure Service Bus (7). Azure Service Bus - это механизм приема и распространения данных, который вполне способен принимать и обрабатывать миллионы сообщений во впечатляющие сроки - именно то, что нам может понадобиться для решения, основанного на датчике IoT. Это не единственный инструмент выбора в отношении приема массовых сообщений. Например, Microsoft предлагает специализированный концентратор событий IoT, и у других поставщиков будут свои предложения. Причины, по которым мы его выбрали, - это дешево, быстро, просто, и он отлично работает с функциями Azure. Azure Service Bus принимает сообщения двумя различными способами: непосредственно в очередь, в отличие от очереди хранения таблицы, хотя и с некоторыми значительно расширенными функциями. Однако для нашей цели мы будем использовать функцию Service Bus Topic, в которой мы отправляем сообщения в так называемую «тему», которую мы можем затем подписывать. Это общий механизм публикации-подписки, чаще всего связанный с различными реализациями служебной шины, и он хорошо работает с сценарием IoT, таким как этот. В конкретно нашей реализации мы создаем общее тему «полученное сообщение», и в это время мы отправляем каждый кусочек данных датчиков, которые получены и обогащены. Это обогащение данных в основном облегчает значимую фильтрацию сообщения в выделенные подписки (8). Простым примером может служить то, как датчик температуры отправляет считывание на приемную функцию Azure. Необработанные данные обогащаются информацией типа устройства, поэтому мы можем вывести тип датчика - температурный - с идентификатора устройства. Это обогащенное сообщение затем отправляется на служебную шину. Подписка на эту тему будет создана и будет собирать, например, любые сообщения от датчиков температуры с температурой, превышающей порог x градусов.
Преимущества этого, в сочетании с использованием приложения Azure Function, становятся совершенно ясными, поскольку мы реагируем на сообщения, которые подхватываются нашими различными темами-подписками, такими как подписка «TemperatureHigh». Подписки действуют как ничто иное, как фильтры сообщений и маршрутизаторы. Для того, чтобы потреблять имеющиеся у нас сообщения, как это зачастую бывает с платформой Azure, существует несколько способов обойти это. Для нашей реализации мы реализуем еще одну функцию Azure, специфичную для сообщений, отправляемых в подписку «TemperatureHigh». Это так просто - мы указываем имя подписки при создании функции, затем ее развертываем, а инфраструктура Azure видит, что функция запускается соответствующим образом (9). Нам не нужно постоянно проводить опрос, мы всегда так "привязаны". Это основное преимущество интеграции этих двух технологий, то есть возможность быстрого создания инфраструктуры, которая в равной степени способна и надежна. Недостаток, конечно же, заключается в том, что это очень эффективная, но жестко связанная архитектура - нет замены компонента служебной шины другим аналогичным продуктом облачного провайдера. Всегда есть необходимый компромисс : для наших целей эта связь работает очень хорошо: когда сообщения поступают на их выделенную подписку, срабатывает одинаково выделенная функция Azure, и сообщение, таким образом, потребляется и действует. Действие в этом случае заключается в выдаче соответствующей команды (10) для самого устройства или, возможно, для другого устройства. Например, при более высокой, чем обычно, температуре, мы можем выдать команду самому устройству, чтобы выпустить звуковой сигнал. Команда выполняется в таблице хранения Azure (11), где мы сохраняем историю выпущенных команд для целей аудита и визуализации. Именно из этой таблицы последняя функция Azure извлекает эту команду после периодического запроса (12) от сенсорного устройства, которое затем выполняет его.
Итак, это пример архитектуры IoT, основанной на облачных технологиях Azure, без центрального брокера. Еще раз, это не следует считать лучшей практикой для всех сценариев, пожалуйста, не применяйте его без учета обстоятельств, связанных с вашими собственными требованиями. 
Теперь, по некоторым техническим аспектам, мы хотели бы представить некоторые фрагменты кода Azure Functions и, таким образом, перейти к деталям реализации. 
Функция Azure, в которую отправляются необработанные данные с датчика, является HTTP-инициированной. Вот код, приведенный ниже:
[FunctionName("QueueRawValue")]
public static async Task<HttpResponseMessage> Run([HttpTrigger(AuthorizationLevel.Anonymous, 
"get", "post", Route = "QueueRawValue")]HttpRequestMessage req, TraceWriter log)
{
    try
    {
        log.Info("C# QueueRawValue http trigger function processed a request.");

        string deviceId = req.GetQueryNameValuePairs().FirstOrDefault
        (q => string.Compare(q.Key, "deviceId", true) == 0).Value; 
        string value = req.GetQueryNameValuePairs().FirstOrDefault
        (q => string.Compare(q.Key, "value", true) == 0).Value;
        DateTime timestamp = DateTime.Now;

        CloudStorageAccount cloudStorageAccount = CloudConfigurationFactory.GetCloudStorageAccount();
        var queueClient = cloudStorageAccount.CreateCloudQueueClient();
        var queueReference = queueClient.GetQueueReference("iotl15queue");
        // Create the queue if it doesn't already exist
        await queueReference.CreateIfNotExistsAsync();

        RawIngestDataModel data = new RawIngestDataModel
        {
            DeviceId = deviceId,
            RawValue = value
        };
        string queueMessage = JsonConvert.SerializeObject(data, Formatting.None);
        var message = new CloudQueueMessage(queueMessage);
        await queueReference.AddMessageAsync(message);

        return deviceId == null
            ? req.CreateResponse(HttpStatusCode.BadRequest, 
            "Please pass a deviceId on the query string or in the request body")
            : req.CreateResponse(HttpStatusCode.OK);
    }
    catch (Exception e)
    {
        // todo: do some logging
        throw e;
    }
}
Пожалуйста, не обращайте внимания на преднамеренно откровенное отсутствие соображений безопасности и других подобных проблем и сосредоточьтесь на основных функциях. Сырые json-данные от датчика помещаются в очередь хранения для последующей обработки. Это все, что делает функция, захватывая исходные входные данные из параметров запроса и сохраняя это в объекте «RawIngestDataModel», представляя только эти исходные входные данные в любом виде или форме. Таким образом, у нас есть очень простой способ захвата информации и хранения ее в очереди, в будущем - надеемся, быстрой и эффективной - обработки. На этом этапе мы могли обрабатывать необработанные данные, но этот проект дает нам точку расширения, которая нам может понадобиться, а затем: если бы число запросов внезапно увеличилось, очередь будет легко масштабировать, чтобы соответствовать этому требованию, благодаря своим встроенным облачным возможностям. 
Затем запускается следующая функция с помощью этой очереди:
[FunctionName("ProcessRawQueueMessage")]
public static void Run([QueueTrigger("iotl15queue", 
      Connection = "AzureStorageConnectionString")]string myQueueItem, TraceWriter log)
{
    try
    {
        RawIngestDataModel rawIngestData = 
           JsonConvert.DeserializeObject<RawIngestDataModel>(myQueueItem);
        
        CloudStorageAccount cloudStorageAccount = CloudConfigurationFactory.GetCloudStorageAccount();
        var cloudService = new AzureTableStorageService(cloudStorageAccount);
        RegisteredValueModel registeredValueModel = 
                    CreateRegisteredDatamodelFromRawInput(rawIngestData);
        cloudService.SendRegisteredDataToTableStorage(registeredValueModel);
  
        // send to servicesbus
        string ServiceBusConnectionString = 
                     @"Endpoint=sb://myservicesbus.windows.net/;
              SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=ssharedkeyD/2zayhog=";
        string TopicName = @"devicemessagetopic";
        ITopicClient topicClient = new TopicClient(ServiceBusConnectionString, TopicName);

        // Create a new message to send to the topic.
        string messageBody = JsonConvert.SerializeObject(registeredValueModel);
        var message = new Message(Encoding.UTF8.GetBytes(messageBody));

        message.UserProperties.Add("DeviceId", registeredValueModel.DeviceId);
        message.UserProperties.Add("TextValue", registeredValueModel.TextValue);
        message.UserProperties.Add("NumericalValue", registeredValueModel.NumericalValue);
        DeviceModel deviceInformation = GetDataAboutDevice(registeredValueModel.DeviceId);
        message.UserProperties.Add("DeviceType", deviceInformation.DeviceType);

        // TODO: enrich with device-type, etc.

        // Send the message to the topic.
        topicClient.SendAsync(message);

        log.Info($"C# Queue trigger function processed: {myQueueItem}");
    }
    catch (Exception e)
    {
        // todo: do some logging
        throw e;
    }
}

private static RegisteredValueModel CreateRegisteredDatamodelFromRawInput
                                              (RawIngestDataModel rawIngestData)
{
    RegisteredValueModel registeredValueModel = new RegisteredValueModel()
    {
        DeviceId = rawIngestData.DeviceId,
        TextValue = rawIngestData.RawValue,
    };

    float attemptToParseValueAsNumerical;
    if (float.TryParse(rawIngestData.RawValue, out attemptToParseValueAsNumerical))
        registeredValueModel.NumericalValue = attemptToParseValueAsNumerical;

    return registeredValueModel;
}

/// <summary>
/// Get device-data from table storage
/// </summary>
/// <remarks>
/// Return dummy data for now.
/// </remarks>
private static DeviceModel GetDataAboutDevice(string deviceId)
{
    // TODO: implement this. Consider memory caching.

    DeviceModel temporaryDeviceModel = new DeviceModel()
    {
        DeviceId = deviceId,
        DeviceType = "TemperatureMeasurementDevice"
    };
    return temporaryDeviceModel;
}
Вышеупомянутая функция отменяет данные из очереди для дальнейшей обработки. Основная часть работы уже сделана для нас, с точки зрения того, как подключиться к очереди и реагировать на новые записи в указанной очереди, все эти довольно важные функции уже подключены для нас и готовы к хорошему использованию. Кажется, что это слишком хорошо, чтобы быть правдой, не так ли? Почти кажется слишком хорошим, чтобы быть правдой, не так ли. Стоит вспомнить старую пословицу, «если это хорошо, чтобы быть правдой ...»: мы получаем огромное количество проверенных функций «бесплатно», так сказать, но, конечно, мы также отказываемся от возможности повлиять на что-либо - мы привязаны к платформе Azure. Это приемлемый выбор для нашей конкретной реализации IoT, но, возможно, не для вас - это плюсы и минусы и то, что вы должны серьезно рассмотреть в соответствии с вашим конкретным сценарием. Вышеприведенный код будет извлекать первые доступные необработанные данные из очереди и преобразовывать их в объект RegisteredValueModel. Обратите внимание, как это наследуется от объекта TableEntity;таким образом мы можем сохранить его в таблице хранения таблиц Azure. В наших целях я использую идентификатор устройства как ключ раздела на столе, так как это кажется естественным. Это не показано здесь, ради краткости. Из этой таблицы мы позже сможем сделать визуализации и исторические компиляции данных устройства. Самое важное, на данный момент, заключается в том, чтобы отметить, как данные зарегистрированного устройства отправляются на тему Azure Service Bus, с именем «devicemessagetopic», которое указывает, как этот раздел действительно получает все сообщения со всех устройств. Здесь останавливается ответственность за функцию Azure. Теперь мы можем пойти и создать подписки на эту тему, что касается наших конкретных случаев использования. Например, создавая вышеупомянутую подписку на опасно высокие температуры от наших температурных датчиков. «temperatureHighSubscription» - наше имя для этого, и, учитывая это имя и действительное соединение с служебной шиной, мы можем легко создать функцию Azure, которая запускается, когда служебная шина Azure фильтрует сообщения на эту подписку:
[FunctionName("GeneralHighTempTriggerFunction")]
public static async Task Run([ServiceBusTrigger("devicemessagetopic", 
"temperatureHighSubscription", 
Connection = "AzureServiceBusConnectionString")]string mySbMsg, TraceWriter log)
{
    log.Info($"C# ServiceBus topic trigger function processed message: {mySbMsg}");

    RegisteredValueModel dataFromServiceBusSubscription = 
    JsonConvert.DeserializeObject<RegisteredValueModel>(mySbMsg);

    // Add to commandModel history data table
    DeviceCommandModel deviceCommand = new DeviceCommandModel()
    {
        DeviceId = dataFromServiceBusSubscription.DeviceId,
        CommandText = "SoundAlarm",
        SentToDevice = false
    };

    CloudStorageAccount cloudStorageAccount = CloudConfigurationFactory.GetCloudStorageAccount();
    var cloudService = new AzureTableStorageService(cloudStorageAccount);
    cloudService.SendDeviceCommandToTableStorage(deviceCommand);

    // Send notification of high temperature to azure logic app:
    INotificationService azureLogicAppNotificationService = 
                        new AzureLogicAppSendPushOverNotificationService();
    NotificationModel notification = new NotificationModel()
    {
        Title = "Temperature-alarm",
        Message = $"Temperature-device {dataFromServiceBusSubscription.DeviceId} 
                  at {dataFromServiceBusSubscription.NumericalValue:F} degrees",
        From = "IOT",
        To = "pushOverUserId" 
    };
    await azureLogicAppNotificationService.SendNotification(notification);
}
Это максимально просто, он уже подключен по дизайну, а функциональность, действующая на триггере, - это все, что нам остается реализовать. В нашем случае подписка - это призыв к действию, а именно срабатывание специальной команды для работы с высокой температурой и «SoundAlarm». Все команды хранятся в таблице хранения Azure, как для контрольного журнала, так и для репозитория команд: все устройства, если они настроены соответственно, могут непрерывно опробовать эту таблицу для любой команды, которая должна быть выполнена ими, - идентифицированной их идентификатором устройства. Быстрая функция Azure, http-triggered, производит доставку:
/// <summary>
/// Retrieves the latest non-yet-retrieved command for a device, if any such command exists.
/// </summary>
[FunctionName("GetCommandFromServicesBus")]
public static HttpResponseMessage Run([HttpTrigger(AuthorizationLevel.Anonymous, 
"get", "post", Route = "GetCommandFromServicesBus")]HttpRequestMessage req, TraceWriter log)
{
    string deviceId = req.GetQueryNameValuePairs().FirstOrDefault
    (q => string.Compare(q.Key, "deviceId", true) == 0).Value; //req.Content["deviceId"];

    CloudStorageAccount cloudStorageAccount = CloudConfigurationFactory.GetCloudStorageAccount();
    var cloudService = new AzureTableStorageService(cloudStorageAccount);
    DeviceCommandModel commandForDevice = cloudService.GetCommandFromTableStorage(deviceId);

    return commandForDevice == null ?
        req.CreateResponse(HttpStatusCode.OK) // no commands found, just OK status
        :
        req.CreateResponse(HttpStatusCode.OK, 
        JsonConvert.SerializeObject(commandForDevice)); // command found, return as json.
}
И так по кругу; устройства отправляют данные, данные обогащаются, отправляются на шину служб и могут быть, возможно, не подхвачены подпиской, которая по очереди запускает команду, и так далее.
Мы не коснулись использования приложений Azure Logic, и мы не будем вдаваться в них, но обратите внимание, что мы реализуем пару для целей уведомления - например, в вышеупомянутом коде GeneralHighTempTriggerFunction. Azure Logic Apps дает нам возможность склеивания множества предложений Azure вместе, но пока это не так. Например, приложение Azure Logic может прослушивать подписные запросы на шине служб и компилировать несколько сообщений в одну команду на устройство или наоборот. Графический интерфейс, с помощью которого вы создаете приложения Logic Apps, интуитивно понятен, но обеспечивает высокий уровень сложности при выполнении. Вы также можете использовать его как сложную точку расширения и внешнюю бизнес-логику для других, например, в то время как вы заботитесь о данных самостоятельно.

No Comments

Add a Comment