데이터 수집 애플리케이션
Caffeine Client SDK를 사용한 데이터 수집 애플리케이션 개발 가이드입니다.
📋 개요
실시간 산업 데이터를 수집, 처리, 저장하는 애플리케이션을 개발합니다.
학습 시간:45분
난이도: ⭐⭐ 중급
🏗️ 아키텍처
🔧 구현
프로젝트 생성
cafe init --name DataCollectorApp --template app
cd DataCollectorApp
# 필요한 패키지
dotnet add package NEXCODE.Caffeine.Client
dotnet add package Microsoft.Extensions.Hosting
dotnet add package Serilog.Extensions.Hosting
BackgroundService 구현
using Caffeine.Client;
using Caffeine.Core.Models;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace DataCollectorApp;
public class DataCollectionService : BackgroundService
{
private readonly CaffeineClient _client;
private readonly ILogger<DataCollectionService> _logger;
private readonly List<string> _tagNames;
private readonly int _intervalMs;
public DataCollectionService(
CaffeineClient client,
ILogger<DataCollectionService> logger,
IConfiguration config)
{
_client = client;
_logger = logger;
_tagNames = config.GetSection("Tags").Get<List<string>>()
?? throw new ArgumentException("Tags configuration missing");
_intervalMs = config.GetValue<int>("CollectionInterval", 1000);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Data Collection Service started");
// Connect to Caffeine Server
if (!await _client.ConnectAsync(stoppingToken))
{
_logger.LogError("Failed to connect to Caffeine Server");
return;
}
// Start collection loop
while (!stoppingToken.IsCancellationRequested)
{
try
{
await CollectDataAsync(stoppingToken);
await Task.Delay(_intervalMs, stoppingToken);
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Collection error");
await Task.Delay(5000, stoppingToken); // Wait before retry
}
}
await _client.DisconnectAsync();
_logger.LogInformation("Data Collection Service stopped");
}
private async Task CollectDataAsync(CancellationToken cancellationToken)
{
// Batch read for efficiency
var values = await _client.ReadTagsAsync(
_tagNames.ToArray(),
cancellationToken);
foreach (var (tagName, tagValue) in values)
{
if (tagValue.Quality == TagQuality.Good)
{
_logger.LogDebug("{TagName} = {Value} @ {Timestamp}",
tagName, tagValue.Value, tagValue.Timestamp);
// Process data (storage, analysis, etc.)
await ProcessTagValueAsync(tagValue, cancellationToken);
}
else
{
_logger.LogWarning("{TagName} quality: {Quality}",
tagName, tagValue.Quality);
}
}
}
private async Task ProcessTagValueAsync(
TagValue tagValue,
CancellationToken cancellationToken)
{
// Example: Save to database, send to cloud, etc.
// Implementation depends on your requirements
await Task.CompletedTask;
}
}
Program.cs
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Caffeine.Client;
using Serilog;
Log.Logger = new LoggerConfiguration()
.WriteTo.Console()
.WriteTo.File("logs/collector-.txt", rollingInterval: RollingInterval.Day)
.CreateLogger();
try
{
var host = Host.CreateDefaultBuilder(args)
.UseSerilog()
.ConfigureServices((context, services) =>
{
// Caffeine Client
services.AddSingleton(sp =>
{
var config = context.Configuration;
var options = new CaffeineClientOptions
{
Server Url = config["Caffeine:ServerUrl"] ?? "https://localhost:5001",
ApiKey = config["Caffeine:ApiKey"],
UseSignalR = true,
Timeout = TimeSpan.FromSeconds(30)
};
return new CaffeineClient(options);
});
// Background Service
services.AddHostedService<DataCollectionService>();
})
.Build();
await host.RunAsync();
}
catch (Exception ex)
{
Log.Fatal(ex, "Application terminated unexpectedly");
}
finally
{
Log.CloseAndFlush();
}
appsettings.json
{
"Caffeine": {
"ServerUrl": "https://localhost:5001",
"ApiKey": "your-api-key-here"
},
"CollectionInterval": 1000,
"Tags": [
"Equipment1.Temperature",
"Equipment1.Pressure",
"Equipment1.Flow",
"Equipment1.Status"
],
"Serilog": {
"MinimumLevel": {
"Default": "Information",
"Override": {
"Microsoft": "Warning",
"System": "Warning"
}
}
}
}
📊 고급 기능
실시간 구독
public class RealtimeCollectionService : BackgroundService
{
private readonly CaffeineClient _client;
private readonly ILogger _logger;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _client.ConnectAsync(stoppingToken);
// Subscribe to real-time changes
foreach (var tagName in _tagNames)
{
await _client.SubscribeTagAsync(tagName, OnTagChanged, stoppingToken);
}
// Keep service running
await Task.Delay(Timeout.Infinite, stoppingToken);
}
private void OnTagChanged(TagValue tagValue)
{
_logger.LogInformation("Changed: {TagName} = {Value}",
tagValue.TagName, tagValue.Value);
// Process real-time data
}
}
로컬 버퍼링
public class BufferedCollectionService : BackgroundService
{
private readonly Queue<TagValue> _buffer = new();
private readonly SemaphoreSlim _semaphore = new(1, 1);
private const int MaxBufferSize = 10000;
private async Task CollectDataAsync(CancellationToken cancellationToken)
{
var values = await _client.ReadTagsAsync(_tagNames.ToArray(), cancellationToken);
await _semaphore.WaitAsync(cancellationToken);
try
{
foreach (var (_, tagValue) in values)
{
if (_buffer.Count >= MaxBufferSize)
{
_buffer.Dequeue(); // Remove oldest
}
_buffer.Enqueue(tagValue);
}
}
finally
{
_semaphore.Release();
}
}
private async Task FlushBufferAsync(CancellationToken cancellationToken)
{
await _semaphore.WaitAsync(cancellation Token);
try
{
while (_buffer.Count > 0)
{
var tagValue = _buffer.Dequeue();
await SaveToCloudAsync(tagValue, cancellationToken);
}
}
finally
{
_semaphore.Release();
}
}
}
📚 참고 자료
작성일: 2026-01-28
버전: 2.0