본문으로 건너뛰기

데이터 수집 애플리케이션

Caffeine Client SDK를 사용한 데이터 수집 애플리케이션 개발 가이드입니다.

📋 개요

실시간 산업 데이터를 수집, 처리, 저장하는 애플리케이션을 개발합니다.

학습 시간:45분
난이도: ⭐⭐ 중급


🏗️ 아키텍처


🔧 구현

프로젝트 생성

cafe init --name DataCollectorApp --template app
cd DataCollectorApp

# 필요한 패키지
dotnet add package NEXCODE.Caffeine.Client
dotnet add package Microsoft.Extensions.Hosting
dotnet add package Serilog.Extensions.Hosting

BackgroundService 구현

using Caffeine.Client;
using Caffeine.Core.Models;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace DataCollectorApp;

public class DataCollectionService : BackgroundService
{
private readonly CaffeineClient _client;
private readonly ILogger<DataCollectionService> _logger;
private readonly List<string> _tagNames;
private readonly int _intervalMs;

public DataCollectionService(
CaffeineClient client,
ILogger<DataCollectionService> logger,
IConfiguration config)
{
_client = client;
_logger = logger;
_tagNames = config.GetSection("Tags").Get<List<string>>()
?? throw new ArgumentException("Tags configuration missing");
_intervalMs = config.GetValue<int>("CollectionInterval", 1000);
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Data Collection Service started");

// Connect to Caffeine Server
if (!await _client.ConnectAsync(stoppingToken))
{
_logger.LogError("Failed to connect to Caffeine Server");
return;
}

// Start collection loop
while (!stoppingToken.IsCancellationRequested)
{
try
{
await CollectDataAsync(stoppingToken);
await Task.Delay(_intervalMs, stoppingToken);
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Collection error");
await Task.Delay(5000, stoppingToken); // Wait before retry
}
}

await _client.DisconnectAsync();
_logger.LogInformation("Data Collection Service stopped");
}

private async Task CollectDataAsync(CancellationToken cancellationToken)
{
// Batch read for efficiency
var values = await _client.ReadTagsAsync(
_tagNames.ToArray(),
cancellationToken);

foreach (var (tagName, tagValue) in values)
{
if (tagValue.Quality == TagQuality.Good)
{
_logger.LogDebug("{TagName} = {Value} @ {Timestamp}",
tagName, tagValue.Value, tagValue.Timestamp);

// Process data (storage, analysis, etc.)
await ProcessTagValueAsync(tagValue, cancellationToken);
}
else
{
_logger.LogWarning("{TagName} quality: {Quality}",
tagName, tagValue.Quality);
}
}
}

private async Task ProcessTagValueAsync(
TagValue tagValue,
CancellationToken cancellationToken)
{
// Example: Save to database, send to cloud, etc.
// Implementation depends on your requirements
await Task.CompletedTask;
}
}

Program.cs

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Caffeine.Client;
using Serilog;

Log.Logger = new LoggerConfiguration()
.WriteTo.Console()
.WriteTo.File("logs/collector-.txt", rollingInterval: RollingInterval.Day)
.CreateLogger();

try
{
var host = Host.CreateDefaultBuilder(args)
.UseSerilog()
.ConfigureServices((context, services) =>
{
// Caffeine Client
services.AddSingleton(sp =>
{
var config = context.Configuration;
var options = new CaffeineClientOptions
{
Server Url = config["Caffeine:ServerUrl"] ?? "https://localhost:5001",
ApiKey = config["Caffeine:ApiKey"],
UseSignalR = true,
Timeout = TimeSpan.FromSeconds(30)
};
return new CaffeineClient(options);
});

// Background Service
services.AddHostedService<DataCollectionService>();
})
.Build();

await host.RunAsync();
}
catch (Exception ex)
{
Log.Fatal(ex, "Application terminated unexpectedly");
}
finally
{
Log.CloseAndFlush();
}

appsettings.json

{
"Caffeine": {
"ServerUrl": "https://localhost:5001",
"ApiKey": "your-api-key-here"
},
"CollectionInterval": 1000,
"Tags": [
"Equipment1.Temperature",
"Equipment1.Pressure",
"Equipment1.Flow",
"Equipment1.Status"
],
"Serilog": {
"MinimumLevel": {
"Default": "Information",
"Override": {
"Microsoft": "Warning",
"System": "Warning"
}
}
}
}

📊 고급 기능

실시간 구독

public class RealtimeCollectionService : BackgroundService
{
private readonly CaffeineClient _client;
private readonly ILogger _logger;

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _client.ConnectAsync(stoppingToken);

// Subscribe to real-time changes
foreach (var tagName in _tagNames)
{
await _client.SubscribeTagAsync(tagName, OnTagChanged, stoppingToken);
}

// Keep service running
await Task.Delay(Timeout.Infinite, stoppingToken);
}

private void OnTagChanged(TagValue tagValue)
{
_logger.LogInformation("Changed: {TagName} = {Value}",
tagValue.TagName, tagValue.Value);

// Process real-time data
}
}

로컬 버퍼링

public class BufferedCollectionService : BackgroundService
{
private readonly Queue<TagValue> _buffer = new();
private readonly SemaphoreSlim _semaphore = new(1, 1);
private const int MaxBufferSize = 10000;

private async Task CollectDataAsync(CancellationToken cancellationToken)
{
var values = await _client.ReadTagsAsync(_tagNames.ToArray(), cancellationToken);

await _semaphore.WaitAsync(cancellationToken);
try
{
foreach (var (_, tagValue) in values)
{
if (_buffer.Count >= MaxBufferSize)
{
_buffer.Dequeue(); // Remove oldest
}
_buffer.Enqueue(tagValue);
}
}
finally
{
_semaphore.Release();
}
}

private async Task FlushBufferAsync(CancellationToken cancellationToken)
{
await _semaphore.WaitAsync(cancellation Token);
try
{
while (_buffer.Count > 0)
{
var tagValue = _buffer.Dequeue();
await SaveToCloudAsync(tagValue, cancellationToken);
}
}
finally
{
_semaphore.Release();
}
}
}

📚 참고 자료


작성일: 2026-01-28
버전: 2.0