Datenimport-Guide: Verbindungen zu Industriellen Datenquellen

Übersicht der unterstützten Datenquellen
Das Industrie-Hub unterstützt folgende Datenquellen:
| Quelle | Typ | Protokoll | Port |
| InfluxDB | Zeitreihen-DB | HTTP/HTTPS | 8086 |
| ClickHouse | Analytics-DB | HTTP/Native | 8123/9000 |
| OPC UA Server | Industrieprotokoll | opc.tcp | 4840 |
| MQTT Broker | Message Queue | TCP/TLS | 1883/8883 |
| REST APIs | Web | HTTP/HTTPS | 80/443 |
| MCP Server | Model Context | stdio/HTTP | variabel |
1. InfluxDB Verbindung
Konfiguration
from influxdb_client import InfluxDBClientVerbindungsparameter
INFLUX_URL = "http://localhost:8086"
INFLUX_TOKEN = "your-token"
INFLUX_ORG = "stromfee"
INFLUX_BUCKET = "netzanalyse"client = InfluxDBClient(
url=INFLUX_URL,
token=INFLUX_TOKEN,
org=INFLUX_ORG
)
Daten schreiben
from influxdb_client import Point
from influxdb_client.client.write_api import SYNCHRONOUSwrite_api = client.write_api(write_options=SYNCHRONOUS)
point = Point("messwert") \
.tag("sensor", "temperatur_1") \
.tag("anlage", "kompressor") \
.field("value", 23.5) \
.field("unit", "celsius")
write_api.write(bucket=INFLUX_BUCKET, record=point)
Daten abfragen (Flux)
query_api = client.query_api()query = '''
from(bucket: "netzanalyse")
'''tables = query_api.query(query)
for table in tables:
for record in table.records:
print(f"{record.get_time()}: {record.get_value()}")
2. ClickHouse Verbindung
Konfiguration
import clickhouse_connectclient = clickhouse_connect.get_client(
host='localhost',
port=8123,
username='default',
password=''
)
Tabelle erstellen
CREATE TABLE IF NOT EXISTS sensor_data (
timestamp DateTime,
sensor_id String,
measurement String,
value Float64,
unit String
) ENGINE = MergeTree()
ORDER BY (sensor_id, timestamp)
Daten einfügen
data = [
('2024-12-14 10:00:00', 'temp_1', 'temperatur', 23.5, 'celsius'),
('2024-12-14 10:00:00', 'temp_2', 'temperatur', 24.1, 'celsius'),
]client.insert('sensor_data', data,
column_names=['timestamp', 'sensor_id', 'measurement', 'value', 'unit'])
Abfragen
result = client.query('''
SELECT
sensor_id,
avg(value) as avg_temp,
max(value) as max_temp
FROM sensor_data
WHERE timestamp > now() - INTERVAL 1 DAY
GROUP BY sensor_id
''')for row in result.result_rows:
print(row)
3. OPC UA Client
Installation
pip install opcua
oder asyncua für async Support
pip install asyncua
Verbindung und Daten lesen
from opcua import Clientdef read_opc_ua_data(endpoint: str, node_ids: list):
"""Lese Daten von OPC UA Server"""
client = Client(endpoint)
try:
client.connect()
results = {}
for node_id in node_ids:
node = client.get_node(node_id)
value = node.get_value()
name = node.get_browse_name().Name
results[name] = {
'value': value,
'node_id': node_id,
'data_type': str(type(value).__name__)
}
return results
finally:
client.disconnect()
Beispiel
data = read_opc_ua_data(
"opc.tcp://192.168.1.100:4840",
["ns=2;i=2", "ns=2;i=3", "ns=2;i=4"]
)
Browse Adressraum
def browse_opc_ua(endpoint: str):
"""Durchsuche OPC UA Adressraum"""
client = Client(endpoint)
client.connect() try:
root = client.get_root_node()
objects = client.get_objects_node()
def browse_recursive(node, level=0):
for child in node.get_children():
name = child.get_browse_name().Name
node_class = child.get_node_class()
print(" " * level + f"{name} ({node_class})")
if level < 3: # Tiefe begrenzen
browse_recursive(child, level + 1)
browse_recursive(objects)
finally:
client.disconnect()
4. MQTT Subscriber
Installation
pip install paho-mqtt
MQTT Client
import paho.mqtt.client as mqtt
import jsonclass MQTTDataCollector:
def __init__(self, broker: str, port: int = 1883):
self.broker = broker
self.port = port
self.client = mqtt.Client()
self.data_buffer = []
self.client.on_connect = self._on_connect
self.client.on_message = self._on_message
def _on_connect(self, client, userdata, flags, rc):
print(f"Connected with result code {rc}")
# Subscribe auf alle Maschinen-Topics
client.subscribe("factory/#")
def _on_message(self, client, userdata, msg):
try:
payload = json.loads(msg.payload.decode())
self.data_buffer.append({
'topic': msg.topic,
'payload': payload,
'timestamp': datetime.now().isoformat()
})
except json.JSONDecodeError:
# Rohdaten als String
self.data_buffer.append({
'topic': msg.topic,
'payload': msg.payload.decode(),
'timestamp': datetime.now().isoformat()
})
def connect(self):
self.client.connect(self.broker, self.port)
self.client.loop_start()
def disconnect(self):
self.client.loop_stop()
self.client.disconnect()
def get_data(self):
data = self.data_buffer.copy()
self.data_buffer.clear()
return data
Beispiel
collector = MQTTDataCollector("broker.hivemq.com")
collector.connect()
5. REST API Integration
Generischer API Client
import httpx
from typing import Optional, Dict, Anyclass APIConnector:
def __init__(self, base_url: str, api_key: Optional[str] = None):
self.base_url = base_url.rstrip('/')
self.headers = {}
if api_key:
self.headers['Authorization'] = f'Bearer {api_key}'
async def get(self, endpoint: str, params: Dict = None) -> Dict[str, Any]:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.base_url}/{endpoint}",
params=params,
headers=self.headers
)
response.raise_for_status()
return response.json()
async def post(self, endpoint: str, data: Dict) -> Dict[str, Any]:
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/{endpoint}",
json=data,
headers=self.headers
)
response.raise_for_status()
return response.json()
Beispiel: Wetterdaten API
weather_api = APIConnector("https://api.openweathermap.org/data/2.5", "your-api-key")
data = await weather_api.get("weather", {"q": "Berlin", "units": "metric"})
6. MCP (Model Context Protocol) Integration
Was ist MCP?
MCP ermöglicht die Anbindung von LLMs an externe Datenquellen und Tools in einem standardisierten Format.MCP Server für Datenquellen
from mcp import MCPServer, Tool, Resourceclass IndustrieDataMCP(MCPServer):
def __init__(self):
super().__init__()
self.register_tools()
self.register_resources()
def register_tools(self):
@self.tool("query_influxdb")
async def query_influx(query: str) -> str:
"""Führe Flux-Query auf InfluxDB aus"""
# Implementation
pass
@self.tool("read_opc_ua")
async def read_opc(endpoint: str, node_id: str) -> dict:
"""Lese Wert von OPC UA Node"""
# Implementation
pass
def register_resources(self):
@self.resource("influxdb://buckets")
async def list_buckets():
"""Liste alle InfluxDB Buckets"""
pass
7. Datenimport-Pipeline
Unified Import Schema
from dataclasses import dataclass
from typing import Any, Optional
from datetime import datetime@dataclass
class DataPoint:
"""Einheitliches Format für alle Datenquellen"""
source: str # z.B. "influxdb", "opcua", "mqtt"
measurement: str # z.B. "temperatur"
value: Any
timestamp: datetime
tags: dict = None # Zusätzliche Metadaten
unit: Optional[str] = None
class DataImporter:
def __init__(self):
self.connectors = {}
def register_connector(self, name: str, connector):
self.connectors[name] = connector
async def import_data(self, source: str, **kwargs) -> list[DataPoint]:
if source not in self.connectors:
raise ValueError(f"Unknown source: {source}")
connector = self.connectors[source]
raw_data = await connector.fetch(**kwargs)
# Normalisiere zu DataPoint
return [self._normalize(source, item) for item in raw_data]
def _normalize(self, source: str, data: dict) -> DataPoint:
# Mapping-Logik je nach Quelle
pass
8. Best Practices
Verbindungs-Pooling
from contextlib import asynccontextmanagerclass ConnectionPool:
def __init__(self):
self._influx = None
self._clickhouse = None
@asynccontextmanager
async def influx(self):
if not self._influx:
self._influx = InfluxDBClient(...)
try:
yield self._influx
finally:
pass # Connection bleibt offen
def close_all(self):
if self._influx:
self._influx.close()
if self._clickhouse:
self._clickhouse.close()
Error Handling
import logging
from tenacity import retry, stop_after_attempt, wait_exponentiallogger = logging.getLogger(__name__)
@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10))
async def fetch_with_retry(connector, **kwargs):
try:
return await connector.fetch(**kwargs)
except ConnectionError as e:
logger.warning(f"Connection failed, retrying: {e}")
raise
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise
Sicherheit
- API-Keys und Tokens NIEMALS im Code
- Umgebungsvariablen oder Secrets Manager nutzen
- TLS/SSL für alle Verbindungen
- Principle of Least Privilege für Datenbankzugriffe
Tags: #InfluxDB #ClickHouse #OPCUA #MQTT #Datenimport #TimeSeries #API #Python #MCP #Stromfee
Generiert mit Stromfee Academy | Bild: Leonardo AI Flux