← Zurueck zur Uebersicht

Datenimport-Guide: Verbindungen zu Industriellen Datenquellen

Datenimport Guide
Datenimport Guide

Übersicht der unterstützten Datenquellen

Das Industrie-Hub unterstützt folgende Datenquellen:

QuelleTypProtokollPort
InfluxDBZeitreihen-DBHTTP/HTTPS8086
ClickHouseAnalytics-DBHTTP/Native8123/9000
OPC UA ServerIndustrieprotokollopc.tcp4840
MQTT BrokerMessage QueueTCP/TLS1883/8883
REST APIsWebHTTP/HTTPS80/443
MCP ServerModel Contextstdio/HTTPvariabel

1. InfluxDB Verbindung

Konfiguration

from influxdb_client import InfluxDBClient

Verbindungsparameter

INFLUX_URL = "http://localhost:8086" INFLUX_TOKEN = "your-token" INFLUX_ORG = "stromfee" INFLUX_BUCKET = "netzanalyse"

client = InfluxDBClient( url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG )

Daten schreiben

from influxdb_client import Point
from influxdb_client.client.write_api import SYNCHRONOUS

write_api = client.write_api(write_options=SYNCHRONOUS)

point = Point("messwert") \ .tag("sensor", "temperatur_1") \ .tag("anlage", "kompressor") \ .field("value", 23.5) \ .field("unit", "celsius")

write_api.write(bucket=INFLUX_BUCKET, record=point)

Daten abfragen (Flux)

query_api = client.query_api()

query = ''' from(bucket: "netzanalyse")

'''

tables = query_api.query(query) for table in tables: for record in table.records: print(f"{record.get_time()}: {record.get_value()}")

2. ClickHouse Verbindung

Konfiguration

import clickhouse_connect

client = clickhouse_connect.get_client( host='localhost', port=8123, username='default', password='' )

Tabelle erstellen

CREATE TABLE IF NOT EXISTS sensor_data (
    timestamp DateTime,
    sensor_id String,
    measurement String,
    value Float64,
    unit String
) ENGINE = MergeTree()
ORDER BY (sensor_id, timestamp)

Daten einfügen

data = [
    ('2024-12-14 10:00:00', 'temp_1', 'temperatur', 23.5, 'celsius'),
    ('2024-12-14 10:00:00', 'temp_2', 'temperatur', 24.1, 'celsius'),
]

client.insert('sensor_data', data, column_names=['timestamp', 'sensor_id', 'measurement', 'value', 'unit'])

Abfragen

result = client.query('''
    SELECT
        sensor_id,
        avg(value) as avg_temp,
        max(value) as max_temp
    FROM sensor_data
    WHERE timestamp > now() - INTERVAL 1 DAY
    GROUP BY sensor_id
''')

for row in result.result_rows: print(row)

3. OPC UA Client

Installation

pip install opcua

oder asyncua für async Support

pip install asyncua

Verbindung und Daten lesen

from opcua import Client

def read_opc_ua_data(endpoint: str, node_ids: list): """Lese Daten von OPC UA Server""" client = Client(endpoint)

try: client.connect()

results = {} for node_id in node_ids: node = client.get_node(node_id) value = node.get_value() name = node.get_browse_name().Name results[name] = { 'value': value, 'node_id': node_id, 'data_type': str(type(value).__name__) }

return results

finally: client.disconnect()

Beispiel

data = read_opc_ua_data( "opc.tcp://192.168.1.100:4840", ["ns=2;i=2", "ns=2;i=3", "ns=2;i=4"] )

Browse Adressraum

def browse_opc_ua(endpoint: str):
    """Durchsuche OPC UA Adressraum"""
    client = Client(endpoint)
    client.connect()

try: root = client.get_root_node() objects = client.get_objects_node()

def browse_recursive(node, level=0): for child in node.get_children(): name = child.get_browse_name().Name node_class = child.get_node_class() print(" " * level + f"{name} ({node_class})")

if level < 3: # Tiefe begrenzen browse_recursive(child, level + 1)

browse_recursive(objects)

finally: client.disconnect()

4. MQTT Subscriber

Installation

pip install paho-mqtt

MQTT Client

import paho.mqtt.client as mqtt
import json

class MQTTDataCollector: def __init__(self, broker: str, port: int = 1883): self.broker = broker self.port = port self.client = mqtt.Client() self.data_buffer = []

self.client.on_connect = self._on_connect self.client.on_message = self._on_message

def _on_connect(self, client, userdata, flags, rc): print(f"Connected with result code {rc}") # Subscribe auf alle Maschinen-Topics client.subscribe("factory/#")

def _on_message(self, client, userdata, msg): try: payload = json.loads(msg.payload.decode()) self.data_buffer.append({ 'topic': msg.topic, 'payload': payload, 'timestamp': datetime.now().isoformat() }) except json.JSONDecodeError: # Rohdaten als String self.data_buffer.append({ 'topic': msg.topic, 'payload': msg.payload.decode(), 'timestamp': datetime.now().isoformat() })

def connect(self): self.client.connect(self.broker, self.port) self.client.loop_start()

def disconnect(self): self.client.loop_stop() self.client.disconnect()

def get_data(self): data = self.data_buffer.copy() self.data_buffer.clear() return data

Beispiel

collector = MQTTDataCollector("broker.hivemq.com") collector.connect()

5. REST API Integration

Generischer API Client

import httpx
from typing import Optional, Dict, Any

class APIConnector: def __init__(self, base_url: str, api_key: Optional[str] = None): self.base_url = base_url.rstrip('/') self.headers = {}

if api_key: self.headers['Authorization'] = f'Bearer {api_key}'

async def get(self, endpoint: str, params: Dict = None) -> Dict[str, Any]: async with httpx.AsyncClient() as client: response = await client.get( f"{self.base_url}/{endpoint}", params=params, headers=self.headers ) response.raise_for_status() return response.json()

async def post(self, endpoint: str, data: Dict) -> Dict[str, Any]: async with httpx.AsyncClient() as client: response = await client.post( f"{self.base_url}/{endpoint}", json=data, headers=self.headers ) response.raise_for_status() return response.json()

Beispiel: Wetterdaten API

weather_api = APIConnector("https://api.openweathermap.org/data/2.5", "your-api-key") data = await weather_api.get("weather", {"q": "Berlin", "units": "metric"})

6. MCP (Model Context Protocol) Integration

Was ist MCP?

MCP ermöglicht die Anbindung von LLMs an externe Datenquellen und Tools in einem standardisierten Format.

MCP Server für Datenquellen

from mcp import MCPServer, Tool, Resource

class IndustrieDataMCP(MCPServer): def __init__(self): super().__init__() self.register_tools() self.register_resources()

def register_tools(self): @self.tool("query_influxdb") async def query_influx(query: str) -> str: """Führe Flux-Query auf InfluxDB aus""" # Implementation pass

@self.tool("read_opc_ua") async def read_opc(endpoint: str, node_id: str) -> dict: """Lese Wert von OPC UA Node""" # Implementation pass

def register_resources(self): @self.resource("influxdb://buckets") async def list_buckets(): """Liste alle InfluxDB Buckets""" pass

7. Datenimport-Pipeline

Unified Import Schema

from dataclasses import dataclass
from typing import Any, Optional
from datetime import datetime

@dataclass class DataPoint: """Einheitliches Format für alle Datenquellen""" source: str # z.B. "influxdb", "opcua", "mqtt" measurement: str # z.B. "temperatur" value: Any timestamp: datetime tags: dict = None # Zusätzliche Metadaten unit: Optional[str] = None

class DataImporter: def __init__(self): self.connectors = {}

def register_connector(self, name: str, connector): self.connectors[name] = connector

async def import_data(self, source: str, **kwargs) -> list[DataPoint]: if source not in self.connectors: raise ValueError(f"Unknown source: {source}")

connector = self.connectors[source] raw_data = await connector.fetch(**kwargs)

# Normalisiere zu DataPoint return [self._normalize(source, item) for item in raw_data]

def _normalize(self, source: str, data: dict) -> DataPoint: # Mapping-Logik je nach Quelle pass

8. Best Practices

Verbindungs-Pooling

from contextlib import asynccontextmanager

class ConnectionPool: def __init__(self): self._influx = None self._clickhouse = None

@asynccontextmanager async def influx(self): if not self._influx: self._influx = InfluxDBClient(...) try: yield self._influx finally: pass # Connection bleibt offen

def close_all(self): if self._influx: self._influx.close() if self._clickhouse: self._clickhouse.close()

Error Handling

import logging
from tenacity import retry, stop_after_attempt, wait_exponential

logger = logging.getLogger(__name__)

@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10)) async def fetch_with_retry(connector, **kwargs): try: return await connector.fetch(**kwargs) except ConnectionError as e: logger.warning(f"Connection failed, retrying: {e}") raise except Exception as e: logger.error(f"Unexpected error: {e}") raise

Sicherheit

  • API-Keys und Tokens NIEMALS im Code
  • Umgebungsvariablen oder Secrets Manager nutzen
  • TLS/SSL für alle Verbindungen
  • Principle of Least Privilege für Datenbankzugriffe

Tags: #InfluxDB #ClickHouse #OPCUA #MQTT #Datenimport #TimeSeries #API #Python #MCP #Stromfee

Generiert mit Stromfee Academy | Bild: Leonardo AI Flux

Im Chat nachfragen Weitere Artikel