본문으로 건너뛰기

엣지 컴퓨팅 가이드

Caffeine을 활용한 엣지 컴퓨팅 애플리케이션 개발 가이드입니다.

📋 개요

로컬에서 데이터를 처리하고 필요시 클라우드로 전송하는 엣지 애플리케이션을 구현합니다.

학습 시간: 60분
난이도: ⭐⭐⭐ 고급


🎯 엣지 컴퓨팅 장점

  • 낮은 지연시간: 로컬 처리로 ms 단위 응답
  • 🔒 데이터 보안: 민감한 데이터는 로컬에만 보관
  • 💰 비용 절감: 클라우드 전송량 최소화
  • 🔄 오프라인 작동: 네트워크 끊김 시에도 작동

🏗️ 아키텍처


🔧 구현

엣지 데이터 처리

using Caffeine.Client;
using Caffeine.Core.Models;

namespace EdgeApp;

public class EdgeDataProcessor
{
private readonly CaffeineClient _client;
private readonly LocalDatabase _localDb;
private readonly CloudUploader _cloudUploader;

public async Task StartProcessingAsync(CancellationToken cancellationToken)
{
await _client.ConnectAsync(cancellationToken);

// Subscribe to all tags
await _client.SubscribeTagAsync("*", ProcessTagValue, cancellationToken);

await Task.Delay(Timeout.Infinite, cancellationToken);
}

private async void ProcessTagValue(TagValue tagValue)
{
// 1. Local processing
var processed = ApplyLocalLogic(tagValue);

// 2. Save to local DB (always)
await _localDb.SaveAsync(processed);

// 3. Real-time dashboard update
await NotifyDashboardAsync(processed);

// 4. Cloud upload (conditional)
if (ShouldUploadToCloud(processed))
{
await _cloudUploader.UploadAsync(processed);
}
}

private ProcessedData ApplyLocalLogic(TagValue tagValue)
{
// Example: Temperature conversion, validation, etc.
var value = tagValue.Value;

if (value is double temp && tagValue.TagName.Contains("Temperature"))
{
// Convert Celsius to Fahrenheit
value = temp * 9/5 + 32;
}

return new ProcessedData
{
TagName = tagValue.TagName,
OriginalValue = tagValue.Value,
ProcessedValue = value,
Timestamp = tagValue.Timestamp,
Quality = tagValue.Quality
};
}

private bool ShouldUploadToCloud(ProcessedData data)
{
// Upload only significant changes or anomalies
if (data.TagName.Contains("Critical"))
{
return true;
}

// Upload aggregated data every 5 minutes
if (DateTime.UtcNow.Minute % 5 == 0)
{
return true;
}

return false;
}
}

로컬 데이터베이스 (SQLite)

using Microsoft.Data.Sqlite;
using Dapper;

public class LocalDatabase
{
private readonly string _connectionString;

public LocalDatabase(string dbPath = "edge_data.db")
{
_connectionString = $"Data Source={dbPath}";
InitializeDatabase();
}

private void InitializeDatabase()
{
using var connection = new SqliteConnection(_connectionString);
connection.Open();

connection.Execute(@"
CREATE TABLE IF NOT EXISTS TagValues (
Id INTEGER PRIMARY KEY AUTOINCREMENT,
TagName TEXT NOT NULL,
Value TEXT NOT NULL,
Quality INTEGER NOT NULL,
Timestamp DATETIME NOT NULL
)");

connection. Execute(@"
CREATE INDEX IF NOT EXISTS idx_timestamp
ON TagValues(Timestamp DESC)");
}

public async Task SaveAsync(ProcessedData data)
{
using var connection = new SqliteConnection(_connectionString);

await connection.ExecuteAsync(@"
INSERT INTO TagValues (TagName, Value, Quality, Timestamp)
VALUES (@TagName, @Value, @Quality, @Timestamp)",
new
{
data.TagName,
Value = data.ProcessedValue.ToString(),
Quality = (int)data.Quality,
data.Timestamp
});
}

public async Task<List<ProcessedData>> GetRecentAsync(
string tagName,
int count = 100)
{
using var connection = new SqliteConnection(_connectionString);

var results = await connection.QueryAsync(@"
SELECT TagName, Value, Quality, Timestamp
FROM TagValues
WHERE TagName = @TagName
ORDER BY Timestamp DESC
LIMIT @Count",
new { TagName = tagName, Count = count });

return results.Select(r => new ProcessedData
{
TagName = r.TagName,
ProcessedValue = r.Value,
Quality = (TagQuality)r.Quality,
Timestamp = r.Timestamp
}).ToList();
}
}

클라우드 업로더

using Azure.Storage.Queues;

public class CloudUploader
{
private readonly QueueClient _queueClient;
private readonly Queue<ProcessedData> _uploadQueue = new();
private readonly Timer _batchUploadTimer;

public CloudUploader(string connectionString, string queueName)
{
_queueClient = new QueueClient(connectionString, queueName);
_queueClient.CreateIfNotExists();

// Batch upload every minute
_batchUploadTimer = new Timer(
BatchUploadCallback,
null,
TimeSpan.FromMinutes(1),
TimeSpan.FromMinutes(1));
}

public async Task UploadAsync(ProcessedData data)
{
lock (_uploadQueue)
{
_uploadQueue.Enqueue(data);
}

// If queue is too large, upload immediately
if (_uploadQueue.Count >= 1000)
{
await FlushQueueAsync();
}
}

private async void BatchUploadCallback(object? state)
{
await FlushQueueAsync();
}

private async Task FlushQueueAsync()
{
List<ProcessedData> batch;

lock (_uploadQueue)
{
if (_uploadQueue.Count == 0) return;

batch = new List<ProcessedData>(_uploadQueue);
_uploadQueue.Clear();
}

try
{
var message = JsonSerializer.Serialize(batch);
await _queueClient.SendMessageAsync(message);

Console.WriteLine($"Uploaded {batch.Count} records to cloud");
}
catch (Exception ex)
{
Console.WriteLine($"Cloud upload failed: {ex.Message}");

// Re-queue on failure
lock (_uploadQueue)
{
foreach (var item in batch)
{
_uploadQueue.Enqueue(item);
}
}
}
}
}

📊 모범 사례

1. 데이터 필터링

// Only upload data that changed significantly
private bool HasSignificantChange(double oldValue, double newValue)
{
var threshold = 0.1; // 10% change
return Math.Abs((newValue - oldValue) / oldValue) > threshold;
}

2. 데이터 집계

// Aggregate to 1-minute averages before cloud upload
public class DataAggregator
{
private readonly Dictionary<string, List<double>> _buffer = new();

public void Add(string tagName, double value)
{
if (!_buffer.ContainsKey(tagName))
{
_buffer[tagName] = new List<double>();
}
_buffer[tagName].Add(value);
}

public Dictionary<string, AggregatedValue> GetAndClear()
{
var result = new Dictionary<string, AggregatedValue>();

foreach (var (tagName, values) in _buffer)
{
result[tagName] = new AggregatedValue
{
Average = values.Average(),
Min = values.Min(),
Max = values.Max(),
Count = values.Count
};
}

_buffer.Clear();
return result;
}
}

3. 오프라인 대응

// Persist data locally when cloud is unavailable
public class ResilientUploader
{
private bool _isCloudAvailable = true;

public async Task<bool> TryUploadAsync(ProcessedData data)
{
if (!_isCloudAvailable)
{
await SaveToLocalStorageAsync(data);
return false;
}

try
{
await _cloudUploader.UploadAsync(data);
_isCloudAvailable = true;

// Retry any pending local data
await RetryLocalDataAsync();

return true;
}
catch (Exception)
{
_isCloudAvailable = false;
await SaveToLocalStorageAsync(data);
return false;
}
}
}

📚 참고 자료


작성일: 2026-01-28
버전: 2.0