Бессерверная работа с IoT и функциями Azure
Концепция интернет-вещей фигурирует почти в каждой статье о будущих перспективах ИТ. И это воистину увлекательная тема, ее сценарии безграничны, а случаи использования хорошо прощупываются, кажется, что мы могли бы реализовать их много лет назад. Но давайте отбросим философские понятия; мы хотели бы продемонстрировать технологии для решения IoT на основе бессерверных предложений Azure. Давайте не будем медлить.
Интернет вещей (IoT) - может быть реализован множеством способов. Сегодня мы расскажем об использовании облачных функций Microsoft Azure, в частности использовании приложений Azure Function, Azure Service Bus и Azure Storage - с очень небольшим количеством приложений Azure Logic.Так что здесь будет много об Azure, в частности - о бессерверных приложений. Если вы еще не знакомы с термином "бессерверный", то мы даем ему такое определение, как использование ресурсов без локального содержания. Конечно, мы все хорошо знаем, как ни один запрос на определенный ресурс никогда не будет «без сервера»; до тех пор, пока в глубине многоуровневого стека есть какой-то физический аппаратный элемент, которому суждено обеспечить ответ на запрос. Таким образом, термин «бессерверный» не следует воспринимать буквально, в отличие от предоставления прозвища для загрузки и запуска ресурсов и ответа на запросы, не заботясь о физических ограничениях оборудования. Так что вам не нужно, в честь фразы термина, настроить фактический сервер для ответа на запросы, вместо этого вы можете просто развернуть сам механизм ответа на облако Azure и вместо того, чтобы ориентироваться на аппаратное метрики - сколько процессоров, будь то развертывание балансировки нагрузки, такие соображения - вы можете сосредоточиться на том, как вы хотите, чтобы механизм ответа работал, например, как масштабировать. Таким образом, вам не нужно настраивать фактический сервер для ответа на запросы, вместо этого вы можете просто развернуть сам механизм ответа на облако Azure и вместо того, чтобы ориентироваться на аппаратные метрики - число процессоров, как развернуть балансировку нагрузки и подобные соображения - вы можете сосредоточиться на том, как вы хотите, чтобы работал механизм ответа, например, как масштабироваться.
Утомительные ограничения в сторону, давайте начинать! Пожалуйста, обратите внимание на приведенную ниже диаграмму архитектуры, когда мы проходим через простой случай использования, рассматривая технологический выбор.
Устройство распределяет свое считывание на Azure Function (1) посредством HTTPS REST-вызова. Функция отвечает за хранение необработанных данных в очереди Azure Storage (2). Функции Azure чрезвычайно дешевы для выполнения - у вас могут быть миллионы выполнений по очень низкой цене, что идеально подходит для сети датчиков, которые часто отправляют данные. Подобный финансовый аргумент применяется к учетной записи Azure Storage, которую мы будем использовать для хранения данных датчиков. Хранение очереди идеально подходит для наших целей; оно предназначено для хранения рабочих элементов, которые будут обработаны на каком-то более позднем этапе, после чего данные будут просто удалены из очереди - и мы также можем указать выделенный жизненный цикл для данных, если это необходимо. Но, самое главное, мы можем использовать специальный триггер приложения приложения Azure (3), который активирует новые элементы в очереди. В этом конкретном случае он извлекает данные из очереди и, исходя из необработанных данных датчика, создает более конкретную и обогащенную модель данных. Мы, конечно, могли бы сделать это в первой функции Azure, но важна абстракция, поскольку она позволяет нам вводить здесь бизнес-логику, если это понадобится позже. В настоящее время единственной логикой является извлечение информации сенсорного устройства из таблицы хранения Azure (4) и добавление бит этой информации в сообщение устройства, которое затем переходит в таблицу хранения Azure, которая содержит данные датчика (5) - но позже мы могли бы добавить туда аутентификацию датчика, и по крайней мере теперь у нас есть абстракция, в которой можно реализовать это в дальнейшем. Функции Azure хорошо масштабируются; если ваша очередь становится переполненной и вы используете функции по так называемому тарифному плану, например, функции будут просто увеличивать количество экземпляров для обработки нагрузки. Это действительно говорит о сути термина "беcсерверный".
[FunctionName("QueueRawValue")]
public static async Task<HttpResponseMessage> Run([HttpTrigger(AuthorizationLevel.Anonymous,
"get", "post", Route = "QueueRawValue")]HttpRequestMessage req, TraceWriter log)
{
try
{
log.Info("C# QueueRawValue http trigger function processed a request.");
string deviceId = req.GetQueryNameValuePairs().FirstOrDefault
(q => string.Compare(q.Key, "deviceId", true) == 0).Value;
string value = req.GetQueryNameValuePairs().FirstOrDefault
(q => string.Compare(q.Key, "value", true) == 0).Value;
DateTime timestamp = DateTime.Now;
CloudStorageAccount cloudStorageAccount = CloudConfigurationFactory.GetCloudStorageAccount();
var queueClient = cloudStorageAccount.CreateCloudQueueClient();
var queueReference = queueClient.GetQueueReference("iotl15queue");
// Create the queue if it doesn't already exist
await queueReference.CreateIfNotExistsAsync();
RawIngestDataModel data = new RawIngestDataModel
{
DeviceId = deviceId,
RawValue = value
};
string queueMessage = JsonConvert.SerializeObject(data, Formatting.None);
var message = new CloudQueueMessage(queueMessage);
await queueReference.AddMessageAsync(message);
return deviceId == null
? req.CreateResponse(HttpStatusCode.BadRequest,
"Please pass a deviceId on the query string or in the request body")
: req.CreateResponse(HttpStatusCode.OK);
}
catch (Exception e)
{
// todo: do some logging
throw e;
}
}
[FunctionName("ProcessRawQueueMessage")]
public static void Run([QueueTrigger("iotl15queue",
Connection = "AzureStorageConnectionString")]string myQueueItem, TraceWriter log)
{
try
{
RawIngestDataModel rawIngestData =
JsonConvert.DeserializeObject<RawIngestDataModel>(myQueueItem);
CloudStorageAccount cloudStorageAccount = CloudConfigurationFactory.GetCloudStorageAccount();
var cloudService = new AzureTableStorageService(cloudStorageAccount);
RegisteredValueModel registeredValueModel =
CreateRegisteredDatamodelFromRawInput(rawIngestData);
cloudService.SendRegisteredDataToTableStorage(registeredValueModel);
// send to servicesbus
string ServiceBusConnectionString =
@"Endpoint=sb://myservicesbus.windows.net/;
SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=ssharedkeyD/2zayhog=";
string TopicName = @"devicemessagetopic";
ITopicClient topicClient = new TopicClient(ServiceBusConnectionString, TopicName);
// Create a new message to send to the topic.
string messageBody = JsonConvert.SerializeObject(registeredValueModel);
var message = new Message(Encoding.UTF8.GetBytes(messageBody));
message.UserProperties.Add("DeviceId", registeredValueModel.DeviceId);
message.UserProperties.Add("TextValue", registeredValueModel.TextValue);
message.UserProperties.Add("NumericalValue", registeredValueModel.NumericalValue);
DeviceModel deviceInformation = GetDataAboutDevice(registeredValueModel.DeviceId);
message.UserProperties.Add("DeviceType", deviceInformation.DeviceType);
// TODO: enrich with device-type, etc.
// Send the message to the topic.
topicClient.SendAsync(message);
log.Info($"C# Queue trigger function processed: {myQueueItem}");
}
catch (Exception e)
{
// todo: do some logging
throw e;
}
}
private static RegisteredValueModel CreateRegisteredDatamodelFromRawInput
(RawIngestDataModel rawIngestData)
{
RegisteredValueModel registeredValueModel = new RegisteredValueModel()
{
DeviceId = rawIngestData.DeviceId,
TextValue = rawIngestData.RawValue,
};
float attemptToParseValueAsNumerical;
if (float.TryParse(rawIngestData.RawValue, out attemptToParseValueAsNumerical))
registeredValueModel.NumericalValue = attemptToParseValueAsNumerical;
return registeredValueModel;
}
/// <summary>
/// Get device-data from table storage
/// </summary>
/// <remarks>
/// Return dummy data for now.
/// </remarks>
private static DeviceModel GetDataAboutDevice(string deviceId)
{
// TODO: implement this. Consider memory caching.
DeviceModel temporaryDeviceModel = new DeviceModel()
{
DeviceId = deviceId,
DeviceType = "TemperatureMeasurementDevice"
};
return temporaryDeviceModel;
}
[FunctionName("GeneralHighTempTriggerFunction")]
public static async Task Run([ServiceBusTrigger("devicemessagetopic",
"temperatureHighSubscription",
Connection = "AzureServiceBusConnectionString")]string mySbMsg, TraceWriter log)
{
log.Info($"C# ServiceBus topic trigger function processed message: {mySbMsg}");
RegisteredValueModel dataFromServiceBusSubscription =
JsonConvert.DeserializeObject<RegisteredValueModel>(mySbMsg);
// Add to commandModel history data table
DeviceCommandModel deviceCommand = new DeviceCommandModel()
{
DeviceId = dataFromServiceBusSubscription.DeviceId,
CommandText = "SoundAlarm",
SentToDevice = false
};
CloudStorageAccount cloudStorageAccount = CloudConfigurationFactory.GetCloudStorageAccount();
var cloudService = new AzureTableStorageService(cloudStorageAccount);
cloudService.SendDeviceCommandToTableStorage(deviceCommand);
// Send notification of high temperature to azure logic app:
INotificationService azureLogicAppNotificationService =
new AzureLogicAppSendPushOverNotificationService();
NotificationModel notification = new NotificationModel()
{
Title = "Temperature-alarm",
Message = $"Temperature-device {dataFromServiceBusSubscription.DeviceId}
at {dataFromServiceBusSubscription.NumericalValue:F} degrees",
From = "IOT",
To = "pushOverUserId"
};
await azureLogicAppNotificationService.SendNotification(notification);
}
/// <summary>
/// Retrieves the latest non-yet-retrieved command for a device, if any such command exists.
/// </summary>
[FunctionName("GetCommandFromServicesBus")]
public static HttpResponseMessage Run([HttpTrigger(AuthorizationLevel.Anonymous,
"get", "post", Route = "GetCommandFromServicesBus")]HttpRequestMessage req, TraceWriter log)
{
string deviceId = req.GetQueryNameValuePairs().FirstOrDefault
(q => string.Compare(q.Key, "deviceId", true) == 0).Value; //req.Content["deviceId"];
CloudStorageAccount cloudStorageAccount = CloudConfigurationFactory.GetCloudStorageAccount();
var cloudService = new AzureTableStorageService(cloudStorageAccount);
DeviceCommandModel commandForDevice = cloudService.GetCommandFromTableStorage(deviceId);
return commandForDevice == null ?
req.CreateResponse(HttpStatusCode.OK) // no commands found, just OK status
:
req.CreateResponse(HttpStatusCode.OK,
JsonConvert.SerializeObject(commandForDevice)); // command found, return as json.
}