엣지 컴퓨팅 가이드
Caffeine을 활용한 엣지 컴퓨팅 애플리케이션 개발 가이드입니다.
📋 개요
로컬에서 데이터를 처리하고 필요시 클라우드로 전송하는 엣지 애플리케이션을 구현합니다.
학습 시간: 60분
난이도: ⭐⭐⭐ 고급
🎯 엣지 컴퓨팅 장점
- ⚡ 낮은 지연시간: 로컬 처리로 ms 단위 응답
- 🔒 데이터 보안: 민감한 데이터는 로컬에만 보관
- 💰 비용 절감: 클라우드 전송량 최소화
- 🔄 오프라인 작동: 네트워크 끊김 시에도 작동
🏗️ 아키텍처
🔧 구현
엣지 데이터 처리
using Caffeine.Client;
using Caffeine.Core.Models;
namespace EdgeApp;
public class EdgeDataProcessor
{
private readonly CaffeineClient _client;
private readonly LocalDatabase _localDb;
private readonly CloudUploader _cloudUploader;
public async Task StartProcessingAsync(CancellationToken cancellationToken)
{
await _client.ConnectAsync(cancellationToken);
// Subscribe to all tags
await _client.SubscribeTagAsync("*", ProcessTagValue, cancellationToken);
await Task.Delay(Timeout.Infinite, cancellationToken);
}
private async void ProcessTagValue(TagValue tagValue)
{
// 1. Local processing
var processed = ApplyLocalLogic(tagValue);
// 2. Save to local DB (always)
await _localDb.SaveAsync(processed);
// 3. Real-time dashboard update
await NotifyDashboardAsync(processed);
// 4. Cloud upload (conditional)
if (ShouldUploadToCloud(processed))
{
await _cloudUploader.UploadAsync(processed);
}
}
private ProcessedData ApplyLocalLogic(TagValue tagValue)
{
// Example: Temperature conversion, validation, etc.
var value = tagValue.Value;
if (value is double temp && tagValue.TagName.Contains("Temperature"))
{
// Convert Celsius to Fahrenheit
value = temp * 9/5 + 32;
}
return new ProcessedData
{
TagName = tagValue.TagName,
OriginalValue = tagValue.Value,
ProcessedValue = value,
Timestamp = tagValue.Timestamp,
Quality = tagValue.Quality
};
}
private bool ShouldUploadToCloud(ProcessedData data)
{
// Upload only significant changes or anomalies
if (data.TagName.Contains("Critical"))
{
return true;
}
// Upload aggregated data every 5 minutes
if (DateTime.UtcNow.Minute % 5 == 0)
{
return true;
}
return false;
}
}
로컬 데이터베이스 (SQLite)
using Microsoft.Data.Sqlite;
using Dapper;
public class LocalDatabase
{
private readonly string _connectionString;
public LocalDatabase(string dbPath = "edge_data.db")
{
_connectionString = $"Data Source={dbPath}";
InitializeDatabase();
}
private void InitializeDatabase()
{
using var connection = new SqliteConnection(_connectionString);
connection.Open();
connection.Execute(@"
CREATE TABLE IF NOT EXISTS TagValues (
Id INTEGER PRIMARY KEY AUTOINCREMENT,
TagName TEXT NOT NULL,
Value TEXT NOT NULL,
Quality INTEGER NOT NULL,
Timestamp DATETIME NOT NULL
)");
connection. Execute(@"
CREATE INDEX IF NOT EXISTS idx_timestamp
ON TagValues(Timestamp DESC)");
}
public async Task SaveAsync(ProcessedData data)
{
using var connection = new SqliteConnection(_connectionString);
await connection.ExecuteAsync(@"
INSERT INTO TagValues (TagName, Value, Quality, Timestamp)
VALUES (@TagName, @Value, @Quality, @Timestamp)",
new
{
data.TagName,
Value = data.ProcessedValue.ToString(),
Quality = (int)data.Quality,
data.Timestamp
});
}
public async Task<List<ProcessedData>> GetRecentAsync(
string tagName,
int count = 100)
{
using var connection = new SqliteConnection(_connectionString);
var results = await connection.QueryAsync(@"
SELECT TagName, Value, Quality, Timestamp
FROM TagValues
WHERE TagName = @TagName
ORDER BY Timestamp DESC
LIMIT @Count",
new { TagName = tagName, Count = count });
return results.Select(r => new ProcessedData
{
TagName = r.TagName,
ProcessedValue = r.Value,
Quality = (TagQuality)r.Quality,
Timestamp = r.Timestamp
}).ToList();
}
}
클라우드 업로더
using Azure.Storage.Queues;
public class CloudUploader
{
private readonly QueueClient _queueClient;
private readonly Queue<ProcessedData> _uploadQueue = new();
private readonly Timer _batchUploadTimer;
public CloudUploader(string connectionString, string queueName)
{
_queueClient = new QueueClient(connectionString, queueName);
_queueClient.CreateIfNotExists();
// Batch upload every minute
_batchUploadTimer = new Timer(
BatchUploadCallback,
null,
TimeSpan.FromMinutes(1),
TimeSpan.FromMinutes(1));
}
public async Task UploadAsync(ProcessedData data)
{
lock (_uploadQueue)
{
_uploadQueue.Enqueue(data);
}
// If queue is too large, upload immediately
if (_uploadQueue.Count >= 1000)
{
await FlushQueueAsync();
}
}
private async void BatchUploadCallback(object? state)
{
await FlushQueueAsync();
}
private async Task FlushQueueAsync()
{
List<ProcessedData> batch;
lock (_uploadQueue)
{
if (_uploadQueue.Count == 0) return;
batch = new List<ProcessedData>(_uploadQueue);
_uploadQueue.Clear();
}
try
{
var message = JsonSerializer.Serialize(batch);
await _queueClient.SendMessageAsync(message);
Console.WriteLine($"Uploaded {batch.Count} records to cloud");
}
catch (Exception ex)
{
Console.WriteLine($"Cloud upload failed: {ex.Message}");
// Re-queue on failure
lock (_uploadQueue)
{
foreach (var item in batch)
{
_uploadQueue.Enqueue(item);
}
}
}
}
}
📊 모범 사례
1. 데이터 필터링
// Only upload data that changed significantly
private bool HasSignificantChange(double oldValue, double newValue)
{
var threshold = 0.1; // 10% change
return Math.Abs((newValue - oldValue) / oldValue) > threshold;
}
2. 데이터 집계
// Aggregate to 1-minute averages before cloud upload
public class DataAggregator
{
private readonly Dictionary<string, List<double>> _buffer = new();
public void Add(string tagName, double value)
{
if (!_buffer.ContainsKey(tagName))
{
_buffer[tagName] = new List<double>();
}
_buffer[tagName].Add(value);
}
public Dictionary<string, AggregatedValue> GetAndClear()
{
var result = new Dictionary<string, AggregatedValue>();
foreach (var (tagName, values) in _buffer)
{
result[tagName] = new AggregatedValue
{
Average = values.Average(),
Min = values.Min(),
Max = values.Max(),
Count = values.Count
};
}
_buffer.Clear();
return result;
}
}
3. 오프라인 대응
// Persist data locally when cloud is unavailable
public class ResilientUploader
{
private bool _isCloudAvailable = true;
public async Task<bool> TryUploadAsync(ProcessedData data)
{
if (!_isCloudAvailable)
{
await SaveToLocalStorageAsync(data);
return false;
}
try
{
await _cloudUploader.UploadAsync(data);
_isCloudAvailable = true;
// Retry any pending local data
await RetryLocalDataAsync();
return true;
}
catch (Exception)
{
_isCloudAvailable = false;
await SaveToLocalStorageAsync(data);
return false;
}
}
}
📚 참고 자료
작성일: 2026-01-28
버전: 2.0