Contenuti




Apache NiFi ExecuteStreamCommand: integrazione con script esterni

Integra Python, shell e programmi esterni nei dataflow NiFi


Apache NiFi ExecuteStreamCommand: Integrazione con Script Esterni

Il processore ExecuteStreamCommand è uno dei componenti più potenti e versatili di Apache NiFi, permettendo l’integrazione senza interruzioni di script Python, shell, programmi compilati e qualsiasi eseguibile esterno nei dataflow NiFi enterprise. Questa guida completa ti mostrerà come padroneggiare questo processore per creare architetture di dati robuste e scalabili.

In questo articolo
  • Architettura ExecuteStreamCommand e meccanismi di comunicazione stdin/stdout
  • Integrazione Python avanzata con gestione errori e logging strutturato
  • Pattern architetturali per microservizi e processing distribuito
  • Configurazioni enterprise per production-ready dataflow
  • Error handling e resilienza per sistemi mission-critical
  • Monitoraggio e tuning delle performance per ottimizzazioni operative
  • Sicurezza e compliance per ambienti enterprise

Indice della Guida

Parte I - Fondamenti e setup

  1. Architettura ExecuteStreamCommand
  2. Configurazione Base
  3. Integrazione Python Essenziale

Parte II - Integrazione Python avanzata

  1. Script Python pronti per produzione
  2. Gestione Errori e Logging
  3. Performance e Ottimizzazioni

Parte III - Pattern architetturali

  1. Pattern Microservizi

Parte IV - Operazioni e monitoraggio

  1. Monitoraggio e alerting
  2. Sicurezza e compliance
  3. Risoluzione problemi e best practices

Architettura ExecuteStreamCommand

Meccanismo di Comunicazione

ExecuteStreamCommand implementa un pattern di comunicazione stdin/stdout che permette lo scambio bidirezionale di dati tra NiFi e processi esterni:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
┌─────────────────┐    stdin     ┌─────────────────┐    stdout    ┌─────────────────┐
│   FlowFile      │ ────────────►│ Processo esterno │ ────────────►│   Nuovo FlowFile │
│   (Input Data)  │              │  (Python/Shell)  │              │  (Processed)    │
└─────────────────┘              └─────────────────┘              └─────────────────┘
                                          │
                                      stderr ▼
                                 ┌─────────────────┐
                                 │   Error Stream  │
                                 │   (Logging)     │
                                 └─────────────────┘

Lifecycle del Processore

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Lifecycle ExecuteStreamCommand
class ExecuteStreamCommandLifecycle:
    """
    Rappresenta il ciclo di vita completo di un'esecuzione
    """

    def __init__(self):
        self.phases = [
            "process_initialization",
            "stdin_data_transfer",
            "external_execution",
            "stdout_capture",
            "stderr_handling",
            "flowfile_generation",
            "cleanup_resources"
        ]

    def execute_phase(self, phase_name, context):
        """Esegue una fase specifica del lifecycle"""
        if phase_name == "process_initialization":
            return self._initialize_external_process(context)
        elif phase_name == "stdin_data_transfer":
            return self._transfer_input_data(context)
        elif phase_name == "external_execution":
            return self._execute_external_command(context)
        # ... altre fasi

Configurazioni Critiche

Parametri Essenziali

Parametro Descrizione Valori Raccomandati
Command Path Percorso eseguibile /usr/bin/python3, /bin/bash
Command Arguments Argomenti comando Script path e parametri
Working Directory Directory di lavoro Directory con permessi appropriati
Variabili ambiente Variabili ambiente PATH, PYTHONPATH, variabili custom

Configurazioni Avanzate

1
2
3
4
5
6
7
8
9
# Configurazioni Performance
nifi.executestream.timeout=300s
nifi.executestream.buffer.size=1048576
nifi.executestream.max.threads=10

# Configurazioni Security
nifi.executestream.allowed.commands=/usr/bin/python3,/bin/bash
nifi.executestream.sandbox.enabled=true
nifi.executestream.user.restrictions=nifi-user

Configurazione Base

Setup Processore NiFi

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
<!-- Template Configurazione ExecuteStreamCommand -->
<processor>
    <id>execute-stream-processor</id>
    <name>ExecuteStreamCommand</name>
    <class>org.apache.nifi.processors.standard.ExecuteStreamCommand</class>
    <properties>
        <property>
            <name>Command Path</name>
            <value>/usr/bin/python3</value>
        </property>
        <property>
            <name>Command Arguments</name>
            <value>/opt/nifi/scripts/data_processor.py</value>
        </property>
        <property>
            <name>Working Directory</name>
            <value>/opt/nifi/working</value>
        </property>
        <property>
            <name>Ignore STDIN</name>
            <value>false</value>
        </property>
        <property>
            <name>Ignore STDOUT</name>
            <value>false</value>
        </property>
        <property>
            <name>Redirect Error Stream</name>
            <value>false</value>
        </property>
    </properties>
    <relationships>
        <relationship>success</relationship>
        <relationship>failure</relationship>
    </relationships>
</processor>

Setup ambiente

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#!/bin/bash
# Setup script per ambiente ExecuteStreamCommand

# Crea directory strutturate
mkdir -p /opt/nifi/{scripts,working,logs,config}

# Imposta permessi corretti
chown -R nifi:nifi /opt/nifi/
chmod 755 /opt/nifi/scripts
chmod 777 /opt/nifi/working  # Directory temporanea
chmod 644 /opt/nifi/config

# Configura Python environment
python3 -m venv /opt/nifi/venv
source /opt/nifi/venv/bin/activate

# Installa dipendenze essenziali
pip install --upgrade pip
pip install pandas numpy requests psycopg2-binary

# Crea script di inizializzazione
cat > /opt/nifi/scripts/init_env.sh << 'EOF'
#!/bin/bash
export PYTHONPATH="/opt/nifi/scripts:$PYTHONPATH"
export NIFI_HOME="/opt/nifi"
export LOG_LEVEL="INFO"
EOF

chmod +x /opt/nifi/scripts/init_env.sh

Integrazione Python Essenziale

Script Base per ExecuteStreamCommand

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
#!/usr/bin/env python3
# /opt/nifi/scripts/base_processor.py
"""
Script base enterprise-ready per ExecuteStreamCommand
"""

import sys
import json
import logging
import traceback
from datetime import datetime
from typing import Dict, Any, Optional
import os

class NiFiStreamProcessor:
    """Processore base per integrazione NiFi ExecuteStreamCommand"""

    def __init__(self, log_level: str = "INFO"):
        self.setup_logging(log_level)
        self.start_time = datetime.now()
        self.metrics = {
            "records_processed": 0,
            "errors_count": 0,
            "processing_time": 0
        }

    def setup_logging(self, log_level: str):
        """Configura logging strutturato per NiFi"""
        log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        logging.basicConfig(
            level=getattr(logging, log_level.upper()),
            format=log_format,
            handlers=[
                logging.StreamHandler(sys.stderr),  # NiFi cattura stderr per logging
                logging.FileHandler('/opt/nifi/logs/processor.log')
            ]
        )
        self.logger = logging.getLogger(self.__class__.__name__)

    def read_input(self) -> Optional[str]:
        """Legge input da stdin con error handling"""
        try:
            input_data = sys.stdin.read()
            if not input_data.strip():
                self.logger.warning("Received empty input from stdin")
                return None

            self.logger.info(f"Received input data: {len(input_data)} bytes")
            return input_data

        except Exception as e:
            self.logger.error(f"Error reading from stdin: {e}")
            self.metrics["errors_count"] += 1
            return None

    def write_output(self, data: Any) -> bool:
        """Scrive output a stdout con validazione"""
        try:
            if isinstance(data, (dict, list)):
                output = json.dumps(data, ensure_ascii=False, indent=None)
            else:
                output = str(data)

            print(output)  # Stdout per NiFi
            sys.stdout.flush()

            self.logger.info(f"Written output: {len(output)} bytes")
            return True

        except Exception as e:
            self.logger.error(f"Error writing to stdout: {e}")
            self.metrics["errors_count"] += 1
            return False

    def process_data(self, input_data: str) -> Any:
        """Template method - da implementare nelle subclassi"""
        raise NotImplementedError("Subclasses must implement process_data method")

    def run(self):
        """Main execution method"""
        try:
            self.logger.info("Starting NiFi stream processing")

            # Leggi input
            input_data = self.read_input()
            if input_data is None:
                sys.exit(1)

            # Processa dati
            result = self.process_data(input_data)

            # Scrivi output
            if not self.write_output(result):
                sys.exit(1)

            # Calcola metriche finali
            self.metrics["processing_time"] = (
                datetime.now() - self.start_time
            ).total_seconds()

            self.logger.info(f"Processing completed successfully. Metrics: {self.metrics}")
            sys.exit(0)

        except Exception as e:
            self.logger.error(f"Fatal error in processing: {e}")
            self.logger.error(traceback.format_exc())
            self.metrics["errors_count"] += 1
            sys.exit(1)

class JSONProcessor(NiFiStreamProcessor):
    """Processore specifico per dati JSON"""

    def process_data(self, input_data: str) -> Any:
        """Elabora dati JSON con validazione e trasformazioni"""
        try:
            # Parse JSON input
            data = json.loads(input_data)
            self.logger.info(f"Parsed JSON with {len(data) if isinstance(data, list) else 1} records")

            # Trasforma i dati
            if isinstance(data, list):
                processed_data = []
                for idx, record in enumerate(data):
                    processed_record = self.transform_record(record, idx)
                    processed_data.append(processed_record)
                    self.metrics["records_processed"] += 1
            else:
                processed_data = self.transform_record(data, 0)
                self.metrics["records_processed"] = 1

            return processed_data

        except json.JSONDecodeError as e:
            self.logger.error(f"Invalid JSON input: {e}")
            raise
        except Exception as e:
            self.logger.error(f"Error processing data: {e}")
            raise

    def transform_record(self, record: Dict[str, Any], index: int) -> Dict[str, Any]:
        """Trasforma singolo record - da personalizzare"""
        # Aggiungi timestamp e metadata
        record["processed_at"] = datetime.now().isoformat()
        record["processed_by"] = "nifi_executestream"
        record["record_index"] = index

        # Validazioni business logic
        if "id" not in record:
            record["id"] = f"generated_{index}"

        return record

if __name__ == "__main__":
    # Recupera configurazioni da environment variables
    log_level = os.getenv("LOG_LEVEL", "INFO")

    # Instanzia e avvia processore
    processor = JSONProcessor(log_level)
    processor.run()

Script avanzato per trasformazione dati

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
#!/usr/bin/env python3
# /opt/nifi/scripts/advanced_transformer.py
"""
Transformer avanzato per casi d'uso enterprise complessi
"""

import pandas as pd
import numpy as np
from datetime import datetime
import requests
from typing import Dict, List, Any
import os
import hashlib

class EnterpriseDataTransformer(NiFiStreamProcessor):
    """Transformer enterprise con integrazione database e API"""

    def __init__(self, log_level: str = "INFO"):
        super().__init__(log_level)
        self.api_endpoints = {
            "validation": os.getenv("VALIDATION_API_URL", "http://localhost:8080/validate"),
            "enrichment": os.getenv("ENRICHMENT_API_URL", "http://localhost:8080/enrich")
        }
        self.batch_size = int(os.getenv("BATCH_SIZE", "1000"))

    def process_data(self, input_data: str) -> Any:
        """Processamento enterprise con validation e enrichment"""
        try:
            # Parse input data
            raw_data = json.loads(input_data)

            # Converti in DataFrame per elaborazioni complesse
            df = pd.DataFrame(raw_data)
            self.logger.info(f"Processing DataFrame with {len(df)} rows and {len(df.columns)} columns")

            # Pipeline di trasformazione
            df = self.validate_data(df)
            df = self.enrich_data(df)
            df = self.apply_business_rules(df)
            df = self.calculate_aggregations(df)

            # Converti risultato in formato output
            result = df.to_dict('records')
            self.metrics["records_processed"] = len(result)

            return {
                "data": result,
                "metadata": {
                    "processed_count": len(result),
                    "processing_timestamp": datetime.now().isoformat(),
                    "data_hash": self.calculate_data_hash(result)
                }
            }

        except Exception as e:
            self.logger.error(f"Error in enterprise processing: {e}")
            raise

    def validate_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """Validazione dati con regole business"""
        initial_count = len(df)

        # Rimuovi duplicati
        df = df.drop_duplicates()

        # Validazione campi obbligatori
        required_fields = ['id', 'timestamp', 'amount']
        for field in required_fields:
            if field not in df.columns:
                raise ValueError(f"Missing required field: {field}")

        # Filtra record invalidi
        df = df[df['amount'].notna() & (df['amount'] > 0)]

        valid_count = len(df)
        self.logger.info(f"Data validation: {initial_count} -> {valid_count} records")

        return df

    def enrich_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """Arricchimento dati tramite API esterne"""
        if len(df) == 0:
            return df

        try:
            # Batch processing per performance
            enriched_batches = []

            for i in range(0, len(df), self.batch_size):
                batch = df.iloc[i:i+self.batch_size]
                enriched_batch = self._enrich_batch(batch)
                enriched_batches.append(enriched_batch)

            return pd.concat(enriched_batches, ignore_index=True)

        except Exception as e:
            self.logger.warning(f"Enrichment failed, proceeding without: {e}")
            return df

    def _enrich_batch(self, batch_df: pd.DataFrame) -> pd.DataFrame:
        """Arricchisce singolo batch tramite API"""
        try:
            # Prepara payload per API
            payload = {
                "records": batch_df.to_dict('records'),
                "request_id": hashlib.md5(str(datetime.now()).encode()).hexdigest()
            }

            # Chiamata API enrichment
            response = requests.post(
                self.api_endpoints["enrichment"],
                json=payload,
                timeout=30
            )

            if response.status_code == 200:
                enriched_data = response.json()["data"]
                return pd.DataFrame(enriched_data)
            else:
                self.logger.warning(f"Enrichment API returned {response.status_code}")
                return batch_df

        except Exception as e:
            self.logger.warning(f"Batch enrichment failed: {e}")
            return batch_df

    def apply_business_rules(self, df: pd.DataFrame) -> pd.DataFrame:
        """Applica regole business specifiche"""
        # Calcola categorie basate su amount
        df['category'] = pd.cut(
            df['amount'],
            bins=[0, 100, 1000, 10000, float('inf')],
            labels=['small', 'medium', 'large', 'enterprise']
        )

        # Aggiungi flag per processing speciale
        df['requires_approval'] = df['amount'] > 10000
        df['risk_score'] = np.random.uniform(0, 1, len(df))  # Placeholder per ML model

        # Timestamp formatting
        df['processed_date'] = datetime.now().strftime('%Y-%m-%d')

        return df

    def calculate_aggregations(self, df: pd.DataFrame) -> pd.DataFrame:
        """Calcola aggregazioni e statistiche"""
        if len(df) == 0:
            return df

        # Aggiungi statistiche per categoria
        category_stats = df.groupby('category')['amount'].agg(['count', 'sum', 'mean']).reset_index()
        category_stats.columns = ['category', 'count', 'total_amount', 'avg_amount']

        # Merge statistiche nel DataFrame originale
        df = df.merge(category_stats, on='category', how='left')

        return df

    def calculate_data_hash(self, data: List[Dict]) -> str:
        """Calcola hash per data integrity"""
        data_str = json.dumps(data, sort_keys=True)
        return hashlib.sha256(data_str.encode()).hexdigest()

if __name__ == "__main__":
    processor = EnterpriseDataTransformer(os.getenv("LOG_LEVEL", "INFO"))
    processor.run()

Gestione Errori e Logging

Sistema di Error Handling Robusto

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
#!/usr/bin/env python3
# /opt/nifi/scripts/error_handler.py
"""
Sistema di gestione errori avanzato per ExecuteStreamCommand
"""

import sys
import json
import logging
import traceback
from enum import Enum
from dataclasses import dataclass
from typing import Dict, Any, Optional, List
from datetime import datetime

class ErrorSeverity(Enum):
    """Livelli di gravità errori"""
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

@dataclass
class ProcessingError:
    """Rappresenta un errore di processing"""
    error_type: str
    severity: ErrorSeverity
    message: str
    details: Dict[str, Any]
    timestamp: str
    record_context: Optional[Dict] = None

class ErrorHandler:
    """Gestore errori centralizzato"""

    def __init__(self):
        self.errors: List[ProcessingError] = []
        self.error_counts = {severity: 0 for severity in ErrorSeverity}
        self.max_errors = {
            ErrorSeverity.LOW: 1000,
            ErrorSeverity.MEDIUM: 100,
            ErrorSeverity.HIGH: 10,
            ErrorSeverity.CRITICAL: 1
        }

    def add_error(self, error: ProcessingError):
        """Aggiunge errore e verifica soglie"""
        self.errors.append(error)
        self.error_counts[error.severity] += 1

        # Log errore
        logger = logging.getLogger("ErrorHandler")
        logger.error(f"{error.severity.value.upper()}: {error.message}")

        # Verifica se superata soglia critica
        if self.error_counts[error.severity] >= self.max_errors[error.severity]:
            if error.severity in [ErrorSeverity.HIGH, ErrorSeverity.CRITICAL]:
                raise ProcessingAbortedException(
                    f"Too many {error.severity.value} errors: {self.error_counts[error.severity]}"
                )

    def get_error_summary(self) -> Dict[str, Any]:
        """Ritorna summary degli errori"""
        return {
            "total_errors": len(self.errors),
            "by_severity": {sev.value: count for sev, count in self.error_counts.items()},
            "critical_errors": [
                {
                    "type": err.error_type,
                    "message": err.message,
                    "timestamp": err.timestamp
                } for err in self.errors if err.severity == ErrorSeverity.CRITICAL
            ]
        }

class ProcessingAbortedException(Exception):
    """Eccezione per interrompere processing"""
    pass

class ResilientProcessor(NiFiStreamProcessor):
    """Processore con gestione errori avanzata"""

    def __init__(self, log_level: str = "INFO"):
        super().__init__(log_level)
        self.error_handler = ErrorHandler()
        self.partial_results = []
        self.continue_on_error = os.getenv("CONTINUE_ON_ERROR", "true").lower() == "true"

    def process_data(self, input_data: str) -> Any:
        """Processing con error handling granulare"""
        try:
            data = json.loads(input_data)

            if isinstance(data, list):
                return self._process_list(data)
            else:
                return self._process_single_record(data, 0)

        except json.JSONDecodeError as e:
            error = ProcessingError(
                error_type="JSON_DECODE_ERROR",
                severity=ErrorSeverity.CRITICAL,
                message=f"Invalid JSON input: {str(e)}",
                details={"input_preview": input_data[:200]},
                timestamp=datetime.now().isoformat()
            )
            self.error_handler.add_error(error)
            raise ProcessingAbortedException("Cannot proceed with invalid JSON")

    def _process_list(self, data_list: List[Dict]) -> Dict[str, Any]:
        """Processa lista con error recovery"""
        successful_records = []

        for idx, record in enumerate(data_list):
            try:
                processed_record = self._process_single_record(record, idx)
                successful_records.append(processed_record)
                self.metrics["records_processed"] += 1

            except Exception as e:
                error = ProcessingError(
                    error_type="RECORD_PROCESSING_ERROR",
                    severity=self._determine_error_severity(e),
                    message=f"Error processing record {idx}: {str(e)}",
                    details={
                        "record_index": idx,
                        "error_class": e.__class__.__name__,
                        "traceback": traceback.format_exc()
                    },
                    timestamp=datetime.now().isoformat(),
                    record_context=record
                )

                try:
                    self.error_handler.add_error(error)

                    if self.continue_on_error:
                        # Aggiungi record con flag errore
                        error_record = record.copy()
                        error_record["_processing_error"] = True
                        error_record["_error_message"] = str(e)
                        successful_records.append(error_record)

                except ProcessingAbortedException:
                    self.logger.critical("Processing aborted due to critical errors")
                    break

        return {
            "data": successful_records,
            "processing_summary": {
                "total_input_records": len(data_list),
                "successful_records": len([r for r in successful_records if not r.get("_processing_error")]),
                "error_records": len([r for r in successful_records if r.get("_processing_error")]),
                "errors_summary": self.error_handler.get_error_summary()
            }
        }

    def _process_single_record(self, record: Dict[str, Any], index: int) -> Dict[str, Any]:
        """Processa singolo record con validazioni"""
        # Validazione schema base
        required_fields = ["id"]
        missing_fields = [field for field in required_fields if field not in record]

        if missing_fields:
            raise ValueError(f"Missing required fields: {missing_fields}")

        # Processing business logic
        processed = record.copy()
        processed["processed_at"] = datetime.now().isoformat()
        processed["record_index"] = index

        # Validazioni aggiuntive
        if "amount" in record and record["amount"] < 0:
            raise ValueError("Amount cannot be negative")

        return processed

    def _determine_error_severity(self, exception: Exception) -> ErrorSeverity:
        """Determina severità errore basata sul tipo"""
        if isinstance(exception, (ValueError, TypeError)):
            return ErrorSeverity.MEDIUM
        elif isinstance(exception, (ConnectionError, TimeoutError)):
            return ErrorSeverity.HIGH
        elif isinstance(exception, (MemoryError, OSError)):
            return ErrorSeverity.CRITICAL
        else:
            return ErrorSeverity.LOW

if __name__ == "__main__":
    processor = ResilientProcessor(os.getenv("LOG_LEVEL", "INFO"))
    processor.run()

Pattern Microservizi

Architettura Service-Oriented con NiFi

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
#!/usr/bin/env python3
# /opt/nifi/scripts/microservice_gateway.py
"""
Gateway per orchestrazione microservizi via ExecuteStreamCommand
"""

import asyncio
import aiohttp
import json
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
import time

class ServiceType(Enum):
    """Tipi di servizi disponibili"""
    VALIDATION = "validation"
    TRANSFORMATION = "transformation"
    ENRICHMENT = "enrichment"
    NOTIFICATION = "notification"

@dataclass
class ServiceEndpoint:
    """Rappresenta un endpoint di microservizio"""
    name: str
    url: str
    service_type: ServiceType
    timeout: int = 30
    retry_count: int = 3
    required: bool = True

class MicroserviceOrchestrator(NiFiStreamProcessor):
    """Orchestratore per chiamate multiple microservizi"""

    def __init__(self, log_level: str = "INFO"):
        super().__init__(log_level)
        self.services = self._load_service_configuration()
        self.circuit_breakers = {}

    def _load_service_configuration(self) -> List[ServiceEndpoint]:
        """Carica configurazione servizi da environment"""
        return [
            ServiceEndpoint(
                name="data_validator",
                url=os.getenv("VALIDATOR_SERVICE_URL", "http://validator:8080/validate"),
                service_type=ServiceType.VALIDATION,
                timeout=10,
                required=True
            ),
            ServiceEndpoint(
                name="data_transformer",
                url=os.getenv("TRANSFORMER_SERVICE_URL", "http://transformer:8080/transform"),
                service_type=ServiceType.TRANSFORMATION,
                timeout=30,
                required=True
            ),
            ServiceEndpoint(
                name="data_enricher",
                url=os.getenv("ENRICHER_SERVICE_URL", "http://enricher:8080/enrich"),
                service_type=ServiceType.ENRICHMENT,
                timeout=20,
                required=False
            ),
            ServiceEndpoint(
                name="notification_service",
                url=os.getenv("NOTIFICATION_SERVICE_URL", "http://notifier:8080/notify"),
                service_type=ServiceType.NOTIFICATION,
                timeout=5,
                required=False
            )
        ]

    async def process_data(self, input_data: str) -> Any:
        """Processing orchestrato con microservizi"""
        try:
            data = json.loads(input_data)
            self.logger.info(f"Starting microservice orchestration for {len(data) if isinstance(data, list) else 1} records")

            # Pipeline sequenziale di microservizi
            result = data

            # 1. Validation Service
            validation_service = next((s for s in self.services if s.service_type == ServiceType.VALIDATION), None)
            if validation_service:
                result = await self._call_service(validation_service, result)

            # 2. Transformation Service
            transformation_service = next((s for s in self.services if s.service_type == ServiceType.TRANSFORMATION), None)
            if transformation_service:
                result = await self._call_service(transformation_service, result)

            # 3. Enrichment Service (optional)
            enrichment_service = next((s for s in self.services if s.service_type == ServiceType.ENRICHMENT), None)
            if enrichment_service:
                try:
                    result = await self._call_service(enrichment_service, result)
                except Exception as e:
                    self.logger.warning(f"Optional enrichment failed: {e}")

            # 4. Async Notification (fire and forget)
            notification_service = next((s for s in self.services if s.service_type == ServiceType.NOTIFICATION), None)
            if notification_service:
                asyncio.create_task(self._notify_async(notification_service, result))

            return {
                "data": result,
                "processing_metadata": {
                    "services_called": len([s for s in self.services if s.required or s.service_type != ServiceType.NOTIFICATION]),
                    "timestamp": datetime.now().isoformat(),
                    "orchestrator_version": "1.0.0"
                }
            }

        except Exception as e:
            self.logger.error(f"Orchestration failed: {e}")
            raise

    async def _call_service(self, service: ServiceEndpoint, data: Any) -> Any:
        """Chiama singolo microservizio con resilienza"""
        for attempt in range(service.retry_count):
            try:
                # Check circuit breaker
                if self._is_circuit_open(service.name):
                    if service.required:
                        raise Exception(f"Circuit breaker open for required service {service.name}")
                    else:
                        self.logger.warning(f"Skipping optional service {service.name} (circuit breaker open)")
                        return data

                start_time = time.time()

                async with aiohttp.ClientSession() as session:
                    payload = {
                        "data": data,
                        "request_id": f"{service.name}_{int(time.time())}",
                        "source": "nifi_executestream"
                    }

                    async with session.post(
                        service.url,
                        json=payload,
                        timeout=aiohttp.ClientTimeout(total=service.timeout)
                    ) as response:

                        if response.status == 200:
                            result = await response.json()
                            duration = time.time() - start_time

                            self.logger.info(f"Service {service.name} completed in {duration:.2f}s")
                            self._record_success(service.name)

                            return result.get("data", result)
                        else:
                            raise aiohttp.ClientResponseError(
                                request_info=None,
                                history=None,
                                status=response.status,
                                message=f"Service {service.name} returned {response.status}"
                            )

            except Exception as e:
                self.logger.warning(f"Service {service.name} attempt {attempt + 1} failed: {e}")
                self._record_failure(service.name)

                if attempt == service.retry_count - 1:
                    if service.required:
                        raise Exception(f"Required service {service.name} failed after {service.retry_count} attempts")
                    else:
                        self.logger.warning(f"Optional service {service.name} failed, continuing without")
                        return data

                # Exponential backoff
                await asyncio.sleep(2 ** attempt)

        return data

    async def _notify_async(self, service: ServiceEndpoint, data: Any):
        """Notifica asincrona (fire and forget)"""
        try:
            await self._call_service(service, {"notification_type": "processing_complete", "summary": {"record_count": len(data) if isinstance(data, list) else 1}})
        except Exception as e:
            self.logger.warning(f"Async notification failed: {e}")

    def _is_circuit_open(self, service_name: str) -> bool:
        """Verifica stato circuit breaker"""
        # Implementazione semplificata circuit breaker
        breaker = self.circuit_breakers.get(service_name, {"failures": 0, "last_failure": 0})

        # Reset after 60 seconds
        if time.time() - breaker["last_failure"] > 60:
            breaker["failures"] = 0

        return breaker["failures"] >= 5

    def _record_success(self, service_name: str):
        """Registra successo chiamata"""
        if service_name in self.circuit_breakers:
            self.circuit_breakers[service_name]["failures"] = 0

    def _record_failure(self, service_name: str):
        """Registra fallimento chiamata"""
        if service_name not in self.circuit_breakers:
            self.circuit_breakers[service_name] = {"failures": 0, "last_failure": 0}

        self.circuit_breakers[service_name]["failures"] += 1
        self.circuit_breakers[service_name]["last_failure"] = time.time()

    def run(self):
        """Override per supporto asyncio"""
        try:
            self.logger.info("Starting microservice orchestration")

            input_data = self.read_input()
            if input_data is None:
                sys.exit(1)

            # Run async processing
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)

            result = loop.run_until_complete(self.process_data(input_data))

            if not self.write_output(result):
                sys.exit(1)

            self.logger.info("Orchestration completed successfully")
            sys.exit(0)

        except Exception as e:
            self.logger.error(f"Fatal orchestration error: {e}")
            sys.exit(1)

if __name__ == "__main__":
    processor = MicroserviceOrchestrator(os.getenv("LOG_LEVEL", "INFO"))
    processor.run()

Monitoraggio e alerting

Sistema di monitoraggio integrato

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
#!/usr/bin/env python3
# /opt/nifi/scripts/monitoring_processor.py
"""
Processore con monitoring e metriche integrate
"""

import time
import psutil
import json
from dataclasses import dataclass, asdict
from typing import Dict, Any, Optional
import requests
import threading
from datetime import datetime, timedelta

@dataclass
class ProcessingMetrics:
    """Metriche dettagliate di processing"""
    # Performance metrics
    processing_time_seconds: float = 0.0
    memory_usage_mb: float = 0.0
    cpu_usage_percent: float = 0.0

    # Data metrics
    records_processed: int = 0
    records_success: int = 0
    records_error: int = 0
    input_size_bytes: int = 0
    output_size_bytes: int = 0

    # Quality metrics
    data_quality_score: float = 0.0
    validation_errors: int = 0
    transformation_errors: int = 0

    # Operational metrics
    external_calls_count: int = 0
    external_calls_failed: int = 0
    cache_hits: int = 0
    cache_misses: int = 0

class MetricsCollector:
    """Collector per metriche system e application"""

    def __init__(self, process_name: str):
        self.process_name = process_name
        self.start_time = time.time()
        self.process = psutil.Process()

    def collect_system_metrics(self) -> Dict[str, Any]:
        """Raccoglie metriche di sistema"""
        return {
            "cpu_percent": self.process.cpu_percent(),
            "memory_mb": self.process.memory_info().rss / 1024 / 1024,
            "num_threads": self.process.num_threads(),
            "open_files": len(self.process.open_files()),
            "uptime_seconds": time.time() - self.start_time
        }

    def collect_nifi_context_metrics(self) -> Dict[str, Any]:
        """Raccoglie metriche specifiche del contesto NiFi"""
        return {
            "nifi_processor": self.process_name,
            "execution_id": os.getenv("NIFI_EXECUTION_ID", "unknown"),
            "flowfile_uuid": os.getenv("NIFI_FLOWFILE_UUID", "unknown"),
            "processor_group": os.getenv("NIFI_PROCESSOR_GROUP", "unknown")
        }

class MonitoredProcessor(NiFiStreamProcessor):
    """Processore con monitoring completo"""

    def __init__(self, log_level: str = "INFO"):
        super().__init__(log_level)
        self.metrics = ProcessingMetrics()
        self.metrics_collector = MetricsCollector("ExecuteStreamCommand")
        self.monitoring_config = {
            "enabled": os.getenv("MONITORING_ENABLED", "true").lower() == "true",
            "metrics_endpoint": os.getenv("METRICS_ENDPOINT", "http://prometheus:9090/metrics"),
            "alert_endpoint": os.getenv("ALERT_ENDPOINT", "http://alertmanager:9093/api/v1/alerts"),
            "reporting_interval": int(os.getenv("METRICS_INTERVAL", "30"))
        }

        # Avvia thread monitoring
        if self.monitoring_config["enabled"]:
            self._start_monitoring_thread()

    def _start_monitoring_thread(self):
        """Avvia thread per reporting metriche periodico"""
        def monitoring_worker():
            while True:
                try:
                    self._report_metrics()
                    time.sleep(self.monitoring_config["reporting_interval"])
                except Exception as e:
                    self.logger.warning(f"Metrics reporting failed: {e}")

        monitor_thread = threading.Thread(target=monitoring_worker, daemon=True)
        monitor_thread.start()

    def process_data(self, input_data: str) -> Any:
        """Processing con raccolta metriche automatica"""
        start_time = time.time()
        start_memory = self.metrics_collector.process.memory_info().rss

        try:
            # Metriche input
            self.metrics.input_size_bytes = len(input_data.encode('utf-8'))

            # Parse e validazione
            data = json.loads(input_data)

            if isinstance(data, list):
                self.metrics.records_processed = len(data)
                processed_data = []

                for record in data:
                    try:
                        processed_record = self._process_record_with_monitoring(record)
                        processed_data.append(processed_record)
                        self.metrics.records_success += 1
                    except Exception as e:
                        self.logger.warning(f"Record processing failed: {e}")
                        self.metrics.records_error += 1
                        self.metrics.validation_errors += 1

                        # Aggiungi record con errore se configurato
                        if os.getenv("INCLUDE_ERROR_RECORDS", "false").lower() == "true":
                            error_record = {"error": str(e), "original_data": record}
                            processed_data.append(error_record)
            else:
                self.metrics.records_processed = 1
                try:
                    processed_data = self._process_record_with_monitoring(data)
                    self.metrics.records_success = 1
                except Exception as e:
                    self.metrics.records_error = 1
                    processed_data = {"error": str(e), "original_data": data}

            # Calcola metriche finali
            self.metrics.processing_time_seconds = time.time() - start_time
            self.metrics.memory_usage_mb = (
                self.metrics_collector.process.memory_info().rss - start_memory
            ) / 1024 / 1024
            self.metrics.cpu_usage_percent = self.metrics_collector.process.cpu_percent()

            # Calcola data quality score
            if self.metrics.records_processed > 0:
                self.metrics.data_quality_score = (
                    self.metrics.records_success / self.metrics.records_processed
                ) * 100

            # Output con metriche
            result = {
                "data": processed_data,
                "processing_metrics": asdict(self.metrics),
                "system_metrics": self.metrics_collector.collect_system_metrics(),
                "nifi_context": self.metrics_collector.collect_nifi_context_metrics()
            }

            self.metrics.output_size_bytes = len(json.dumps(result).encode('utf-8'))

            # Verifica soglie e genera alert se necessario
            self._check_alert_conditions()

            return result

        except Exception as e:
            self.metrics.processing_time_seconds = time.time() - start_time
            self.metrics.records_error += 1
            raise

    def _process_record_with_monitoring(self, record: Dict[str, Any]) -> Dict[str, Any]:
        """Processa singolo record con monitoring"""
        record_start = time.time()

        # Simulazione processing business logic
        processed = record.copy()
        processed["processed_at"] = datetime.now().isoformat()
        processed["processing_duration_ms"] = (time.time() - record_start) * 1000

        # Simula chiamata esterna (con monitoring)
        if "external_lookup_required" in record:
            self._external_service_call_monitored(record["id"])

        return processed

    def _external_service_call_monitored(self, record_id: str):
        """Chiamata servizio esterno con monitoring"""
        self.metrics.external_calls_count += 1

        try:
            # Simula chiamata esterna
            time.sleep(0.1)  # Placeholder per chiamata reale

            # Check cache first
            if self._check_cache(record_id):
                self.metrics.cache_hits += 1
            else:
                self.metrics.cache_misses += 1
                # Actual external call would go here

        except Exception as e:
            self.metrics.external_calls_failed += 1
            raise

    def _check_cache(self, key: str) -> bool:
        """Verifica cache (placeholder)"""
        # Simulazione cache lookup
        return hash(key) % 4 == 0  # 25% cache hit rate

    def _check_alert_conditions(self):
        """Verifica condizioni per alert"""
        alerts = []

        # Performance alerts
        if self.metrics.processing_time_seconds > 300:  # 5 minutes
            alerts.append({
                "severity": "warning",
                "type": "performance",
                "message": f"Processing time exceeded threshold: {self.metrics.processing_time_seconds:.2f}s"
            })

        if self.metrics.memory_usage_mb > 1024:  # 1GB
            alerts.append({
                "severity": "warning",
                "type": "resource",
                "message": f"High memory usage: {self.metrics.memory_usage_mb:.2f}MB"
            })

        # Quality alerts
        if self.metrics.data_quality_score < 95.0:
            alerts.append({
                "severity": "warning",
                "type": "data_quality",
                "message": f"Data quality below threshold: {self.metrics.data_quality_score:.1f}%"
            })

        # Error rate alerts
        if self.metrics.records_processed > 0:
            error_rate = (self.metrics.records_error / self.metrics.records_processed) * 100
            if error_rate > 10.0:
                alerts.append({
                    "severity": "critical",
                    "type": "error_rate",
                    "message": f"High error rate: {error_rate:.1f}%"
                })

        # Invia alert se presenti
        if alerts:
            self._send_alerts(alerts)

    def _send_alerts(self, alerts: List[Dict[str, Any]]):
        """Invia alert al sistema di alerting"""
        try:
            payload = {
                "alerts": alerts,
                "source": "nifi_executestream",
                "timestamp": datetime.now().isoformat(),
                "context": self.metrics_collector.collect_nifi_context_metrics()
            }

            requests.post(
                self.monitoring_config["alert_endpoint"],
                json=payload,
                timeout=5
            )

            self.logger.warning(f"Sent {len(alerts)} alerts to monitoring system")

        except Exception as e:
            self.logger.error(f"Failed to send alerts: {e}")

    def _report_metrics(self):
        """Invia metriche a sistema di monitoring"""
        try:
            metrics_data = {
                "processing_metrics": asdict(self.metrics),
                "system_metrics": self.metrics_collector.collect_system_metrics(),
                "timestamp": datetime.now().isoformat()
            }

            # Invia a Prometheus/InfluxDB/etc
            requests.post(
                self.monitoring_config["metrics_endpoint"],
                json=metrics_data,
                timeout=10
            )

        except Exception as e:
            self.logger.debug(f"Metrics reporting failed: {e}")

if __name__ == "__main__":
    processor = MonitoredProcessor(os.getenv("LOG_LEVEL", "INFO"))
    processor.run()

Sicurezza e compliance

Hardening di sicurezza per ExecuteStreamCommand

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
#!/usr/bin/env python3
# /opt/nifi/scripts/secure_processor.py
"""
Processore con security hardening e compliance
"""

import os
import hashlib
import hmac
import secrets
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
import json
from typing import Dict, Any, Optional
import re
from datetime import datetime, timedelta

class SecurityManager:
    """Gestore sicurezza per ExecuteStreamCommand"""

    def __init__(self):
        self.encryption_key = self._derive_encryption_key()
        self.cipher_suite = Fernet(self.encryption_key)
        self.sensitive_patterns = [
            r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b',  # Credit card
            r'\b\d{3}-\d{2}-\d{4}\b',  # SSN
            r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',  # Email
            r'\b(?:\d{1,3}\.){3}\d{1,3}\b',  # IP addresses
        ]

    def _derive_encryption_key(self) -> bytes:
        """Deriva chiave crittografia da password"""
        password = os.getenv("ENCRYPTION_PASSWORD", "default-key-change-in-production").encode()
        salt = os.getenv("ENCRYPTION_SALT", "default-salt").encode()

        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=salt,
            iterations=100000,
        )

        key = base64.urlsafe_b64encode(kdf.derive(password))
        return key

    def encrypt_sensitive_data(self, data: str) -> str:
        """Cripta dati sensibili"""
        try:
            encrypted = self.cipher_suite.encrypt(data.encode())
            return base64.urlsafe_b64encode(encrypted).decode()
        except Exception as e:
            raise SecurityException(f"Encryption failed: {e}")

    def decrypt_sensitive_data(self, encrypted_data: str) -> str:
        """Decripta dati sensibili"""
        try:
            decoded = base64.urlsafe_b64decode(encrypted_data.encode())
            decrypted = self.cipher_suite.decrypt(decoded)
            return decrypted.decode()
        except Exception as e:
            raise SecurityException(f"Decryption failed: {e}")

    def sanitize_logs(self, log_message: str) -> str:
        """Sanitizza log rimuovendo dati sensibili"""
        sanitized = log_message

        for pattern in self.sensitive_patterns:
            sanitized = re.sub(pattern, '[REDACTED]', sanitized)

        return sanitized

    def validate_input_security(self, data: Any) -> bool:
        """Valida input per security violations"""
        data_str = json.dumps(data) if isinstance(data, (dict, list)) else str(data)

        # Check for injection patterns
        dangerous_patterns = [
            r'<script[^>]*>.*?</script>',  # XSS
            r'(union|select|insert|update|delete|drop)\s+',  # SQL injection
            r'(eval|exec|system|shell_exec)\s*\(',  # Code injection
            r'\.\./',  # Path traversal
        ]

        for pattern in dangerous_patterns:
            if re.search(pattern, data_str, re.IGNORECASE):
                return False

        return True

    def generate_audit_hash(self, data: Any) -> str:
        """Genera hash per audit trail"""
        data_str = json.dumps(data, sort_keys=True) if isinstance(data, (dict, list)) else str(data)
        timestamp = datetime.now().isoformat()

        content = f"{data_str}:{timestamp}"
        return hashlib.sha256(content.encode()).hexdigest()

    def verify_request_signature(self, data: str, signature: str, secret: str) -> bool:
        """Verifica firma HMAC della richiesta"""
        try:
            expected_signature = hmac.new(
                secret.encode(),
                data.encode(),
                hashlib.sha256
            ).hexdigest()

            return hmac.compare_digest(signature, expected_signature)
        except Exception:
            return False

class SecurityException(Exception):
    """Eccezione per violazioni security"""
    pass

class AuditLogger:
    """Logger per audit trail"""

    def __init__(self):
        self.audit_file = os.getenv("AUDIT_LOG_FILE", "/opt/nifi/logs/audit.log")
        os.makedirs(os.path.dirname(self.audit_file), exist_ok=True)

    def log_processing_event(self, event_type: str, user_context: Dict, data_hash: str,
                           processing_details: Dict):
        """Logga evento di processing per compliance"""
        audit_entry = {
            "timestamp": datetime.now().isoformat(),
            "event_type": event_type,
            "user_context": user_context,
            "data_hash": data_hash,
            "processing_details": processing_details,
            "compliance_flags": {
                "gdpr_applicable": True,
                "data_retention_days": 365,
                "encryption_used": True
            }
        }

        with open(self.audit_file, 'a') as f:
            f.write(json.dumps(audit_entry) + '\n')

class SecureProcessor(NiFiStreamProcessor):
    """Processore con security e compliance integrate"""

    def __init__(self, log_level: str = "INFO"):
        super().__init__(log_level)
        self.security_manager = SecurityManager()
        self.audit_logger = AuditLogger()
        self.compliance_mode = os.getenv("COMPLIANCE_MODE", "strict")
        self.max_data_size = int(os.getenv("MAX_DATA_SIZE_BYTES", "10485760"))  # 10MB

    def process_data(self, input_data: str) -> Any:
        """Processing con security e compliance"""

        # Security validations
        if len(input_data.encode('utf-8')) > self.max_data_size:
            raise SecurityException(f"Input data exceeds maximum size: {self.max_data_size} bytes")

        try:
            data = json.loads(input_data)
        except json.JSONDecodeError:
            raise SecurityException("Invalid JSON input - potential injection attempt")

        # Validate input security
        if not self.security_manager.validate_input_security(data):
            raise SecurityException("Input contains potentially dangerous patterns")

        # Generate audit hash
        data_hash = self.security_manager.generate_audit_hash(data)

        # Extract user context from NiFi attributes
        user_context = {
            "nifi_user": os.getenv("NIFI_USER", "system"),
            "source_ip": os.getenv("NIFI_SOURCE_IP", "unknown"),
            "execution_context": os.getenv("NIFI_EXECUTION_ID", "unknown")
        }

        # Log processing start
        self.audit_logger.log_processing_event(
            "processing_started",
            user_context,
            data_hash,
            {"record_count": len(data) if isinstance(data, list) else 1}
        )

        try:
            # Actual processing
            processed_data = self._secure_process_records(data)

            # Compliance post-processing
            final_result = self._apply_compliance_rules(processed_data)

            # Log successful processing
            self.audit_logger.log_processing_event(
                "processing_completed",
                user_context,
                self.security_manager.generate_audit_hash(final_result),
                {
                    "records_processed": len(final_result.get("data", [])) if isinstance(final_result, dict) else 1,
                    "security_level": "high",
                    "compliance_verified": True
                }
            )

            return final_result

        except Exception as e:
            # Log processing error
            self.audit_logger.log_processing_event(
                "processing_error",
                user_context,
                data_hash,
                {"error": str(e), "error_type": e.__class__.__name__}
            )
            raise

    def _secure_process_records(self, data: Any) -> Any:
        """Processa record con sicurezza"""
        if isinstance(data, list):
            processed_records = []

            for idx, record in enumerate(data):
                # Sanitize record
                sanitized_record = self._sanitize_record(record)

                # Process business logic
                processed_record = self._apply_business_logic(sanitized_record, idx)

                # Encrypt sensitive fields if required
                if self.compliance_mode == "strict":
                    processed_record = self._encrypt_sensitive_fields(processed_record)

                processed_records.append(processed_record)

            return processed_records
        else:
            sanitized_data = self._sanitize_record(data)
            processed_data = self._apply_business_logic(sanitized_data, 0)

            if self.compliance_mode == "strict":
                processed_data = self._encrypt_sensitive_fields(processed_data)

            return processed_data

    def _sanitize_record(self, record: Dict[str, Any]) -> Dict[str, Any]:
        """Sanitizza record per security"""
        sanitized = {}

        for key, value in record.items():
            # Sanitize key names
            safe_key = re.sub(r'[^a-zA-Z0-9_]', '_', key)

            # Sanitize values
            if isinstance(value, str):
                # Remove potential script injection
                safe_value = re.sub(r'<[^>]*>', '', value)
                # Limit string length
                safe_value = safe_value[:1000] if len(safe_value) > 1000 else safe_value
            else:
                safe_value = value

            sanitized[safe_key] = safe_value

        return sanitized

    def _apply_business_logic(self, record: Dict[str, Any], index: int) -> Dict[str, Any]:
        """Applica business logic con security"""
        processed = record.copy()

        # Add processing metadata
        processed["processed_at"] = datetime.now().isoformat()
        processed["security_level"] = "verified"
        processed["record_index"] = index

        # Validate required security fields
        if "user_id" in record:
            processed["user_id_hash"] = hashlib.sha256(str(record["user_id"]).encode()).hexdigest()

        return processed

    def _encrypt_sensitive_fields(self, record: Dict[str, Any]) -> Dict[str, Any]:
        """Cripta campi sensibili"""
        sensitive_fields = ["ssn", "credit_card", "personal_data", "private_info"]
        encrypted_record = record.copy()

        for field in sensitive_fields:
            if field in record and record[field]:
                try:
                    encrypted_value = self.security_manager.encrypt_sensitive_data(str(record[field]))
                    encrypted_record[f"{field}_encrypted"] = encrypted_value
                    # Remove original sensitive data
                    del encrypted_record[field]
                except Exception as e:
                    self.logger.warning(f"Failed to encrypt field {field}: {e}")

        return encrypted_record

    def _apply_compliance_rules(self, processed_data: Any) -> Dict[str, Any]:
        """Applica regole compliance (GDPR, HIPAA, etc.)"""

        compliance_metadata = {
            "gdpr_compliance": {
                "data_processing_lawful_basis": "legitimate_interest",
                "data_subject_rights_respected": True,
                "retention_period_days": 365,
                "encrypted_fields_count": self._count_encrypted_fields(processed_data)
            },
            "security_measures": {
                "encryption_at_rest": True,
                "access_logging": True,
                "input_validation": True,
                "output_sanitization": True
            },
            "audit_trail": {
                "processing_timestamp": datetime.now().isoformat(),
                "compliance_version": "1.0",
                "security_officer": os.getenv("SECURITY_OFFICER", "system")
            }
        }

        return {
            "data": processed_data,
            "compliance_metadata": compliance_metadata,
            "processing_signature": self.security_manager.generate_audit_hash(processed_data)
        }

    def _count_encrypted_fields(self, data: Any) -> int:
        """Conta campi crittografati"""
        count = 0

        def count_in_dict(d):
            nonlocal count
            if isinstance(d, dict):
                for key, value in d.items():
                    if key.endswith('_encrypted'):
                        count += 1
                    elif isinstance(value, (dict, list)):
                        count_in_dict(value)
            elif isinstance(d, list):
                for item in d:
                    count_in_dict(item)

        count_in_dict(data)
        return count

if __name__ == "__main__":
    processor = SecureProcessor(os.getenv("LOG_LEVEL", "INFO"))
    processor.run()

Risoluzione problemi e best practices

Diagnostica e tuning delle performance

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
#!/bin/bash
# /opt/nifi/scripts/diagnostic_tools.sh
# Tools di diagnostica per ExecuteStreamCommand

set -euo pipefail

# Configurazioni
NIFI_HOME="${NIFI_HOME:-/opt/nifi}"
SCRIPTS_DIR="$NIFI_HOME/scripts"
LOGS_DIR="$NIFI_HOME/logs"
WORKING_DIR="$NIFI_HOME/working"

# Colori per output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color

log_info() {
    echo -e "${GREEN}[INFO]${NC} $1"
}

log_warn() {
    echo -e "${YELLOW}[WARN]${NC} $1"
}

log_error() {
    echo -e "${RED}[ERROR]${NC} $1"
}

# Verifica sistema
check_system_requirements() {
    log_info "Checking system requirements..."

    # Check Python
    if command -v python3 &> /dev/null; then
        PYTHON_VERSION=$(python3 --version | cut -d' ' -f2)
        log_info "Python version: $PYTHON_VERSION"
    else
        log_error "Python3 not found"
        return 1
    fi

    # Check permissions
    if [ -w "$WORKING_DIR" ]; then
        log_info "Working directory writable: $WORKING_DIR"
    else
        log_error "Working directory not writable: $WORKING_DIR"
        return 1
    fi

    # Check disk space
    AVAILABLE_SPACE=$(df -h "$NIFI_HOME" | awk 'NR==2 {print $4}')
    log_info "Available disk space: $AVAILABLE_SPACE"

    # Check memory
    AVAILABLE_MEMORY=$(free -h | awk 'NR==2{print $7}')
    log_info "Available memory: $AVAILABLE_MEMORY"

    return 0
}

# Test ExecuteStreamCommand configuration
test_executestream_config() {
    log_info "Testing ExecuteStreamCommand configuration..."

    # Test script di esempio
    cat > "$WORKING_DIR/test_script.py" << 'EOF'
#!/usr/bin/env python3
import sys
import json

try:
    input_data = sys.stdin.read()
    if input_data:
        data = json.loads(input_data)
        data["test_processed"] = True
        print(json.dumps(data))
        sys.exit(0)
    else:
        print("No input received")
        sys.exit(1)
except Exception as e:
    print(f"Error: {e}")
    sys.exit(1)
EOF

    chmod +x "$WORKING_DIR/test_script.py"

    # Test input
    echo '{"test": "data", "timestamp": "'$(date -Iseconds)'"}' | \
        python3 "$WORKING_DIR/test_script.py"

    if [ $? -eq 0 ]; then
        log_info "Basic script execution test: PASSED"
    else
        log_error "Basic script execution test: FAILED"
        return 1
    fi
}

# Performance benchmark
performance_benchmark() {
    log_info "Running performance benchmark..."

    # Crea script di benchmark
    cat > "$WORKING_DIR/benchmark_script.py" << 'EOF'
#!/usr/bin/env python3
import sys
import json
import time
import psutil
import os

start_time = time.time()
process = psutil.Process()
start_memory = process.memory_info().rss

try:
    input_data = sys.stdin.read()
    data = json.loads(input_data)

    # Simula processing intensivo
    if isinstance(data, list):
        for i, record in enumerate(data):
            record["processed_index"] = i
            record["benchmark_timestamp"] = time.time()
            # Simula calcolo
            result = sum(range(1000))
            record["computation_result"] = result

    processing_time = time.time() - start_time
    end_memory = process.memory_info().rss
    memory_used = (end_memory - start_memory) / 1024 / 1024  # MB

    result = {
        "data": data,
        "benchmark_results": {
            "processing_time_seconds": processing_time,
            "memory_used_mb": memory_used,
            "records_processed": len(data) if isinstance(data, list) else 1,
            "throughput_records_per_second": (len(data) if isinstance(data, list) else 1) / processing_time if processing_time > 0 else 0
        }
    }

    print(json.dumps(result, indent=2))

except Exception as e:
    print(json.dumps({"error": str(e)}))
    sys.exit(1)
EOF

    # Genera test data
    python3 -c "
import json
test_data = [{'id': i, 'value': f'test_{i}', 'amount': i * 10} for i in range(1000)]
print(json.dumps(test_data))
" | python3 "$WORKING_DIR/benchmark_script.py" > "$LOGS_DIR/benchmark_results.json"

    # Analizza risultati
    python3 -c "
import json
with open('$LOGS_DIR/benchmark_results.json', 'r') as f:
    results = json.load(f)

if 'benchmark_results' in results:
    bench = results['benchmark_results']
    print(f\"Processing Time: {bench['processing_time_seconds']:.3f}s\")
    print(f\"Memory Used: {bench['memory_used_mb']:.2f}MB\")
    print(f\"Throughput: {bench['throughput_records_per_second']:.1f} records/sec\")

    # Performance thresholds
    if bench['processing_time_seconds'] > 10:
        print('WARNING: Processing time exceeded 10 seconds')
    if bench['memory_used_mb'] > 500:
        print('WARNING: Memory usage exceeded 500MB')
    if bench['throughput_records_per_second'] < 50:
        print('WARNING: Low throughput detected')
"
}

# Memory leak detection
detect_memory_leaks() {
    log_info "Running memory leak detection..."

    cat > "$WORKING_DIR/memory_test_script.py" << 'EOF'
#!/usr/bin/env python3
import sys
import json
import psutil
import time
import gc

def process_batch():
    # Simula processing che potrebbe avere memory leak
    data = []
    for i in range(10000):
        data.append({"id": i, "data": "x" * 1000})

    # Processa dati
    processed = []
    for item in data:
        processed_item = item.copy()
        processed_item["processed"] = True
        processed.append(processed_item)

    return len(processed)

# Monitora memoria per multiple iterazioni
process = psutil.Process()
memory_usage = []

for iteration in range(5):
    gc.collect()  # Force garbage collection
    start_memory = process.memory_info().rss / 1024 / 1024

    result_count = process_batch()

    end_memory = process.memory_info().rss / 1024 / 1024
    memory_diff = end_memory - start_memory

    memory_usage.append({
        "iteration": iteration,
        "start_memory_mb": start_memory,
        "end_memory_mb": end_memory,
        "memory_diff_mb": memory_diff,
        "records_processed": result_count
    })

    print(f"Iteration {iteration}: Memory usage {memory_diff:.2f}MB", file=sys.stderr)

# Analiza trend memoria
memory_trend = [usage["memory_diff_mb"] for usage in memory_usage]
avg_memory_diff = sum(memory_trend) / len(memory_trend)

result = {
    "memory_analysis": {
        "iterations": memory_usage,
        "average_memory_per_iteration_mb": avg_memory_diff,
        "potential_memory_leak": avg_memory_diff > 50  # Soglia 50MB per iterazione
    }
}

print(json.dumps(result, indent=2))
EOF

    echo '{"test": "memory_leak_detection"}' | python3 "$WORKING_DIR/memory_test_script.py" > "$LOGS_DIR/memory_analysis.json"

    # Analizza risultati memory leak
    python3 -c "
import json
with open('$LOGS_DIR/memory_analysis.json', 'r') as f:
    results = json.load(f)

analysis = results['memory_analysis']
if analysis['potential_memory_leak']:
    print('WARNING: Potential memory leak detected!')
    print(f'Average memory per iteration: {analysis[\"average_memory_per_iteration_mb\"]:.2f}MB')
else:
    print('Memory usage appears stable')
"
}

# Network connectivity test
test_network_connectivity() {
    log_info "Testing network connectivity for external services..."

    # Test common endpoints
    endpoints=(
        "httpbin.org:80"
        "jsonplaceholder.typicode.com:443"
        "api.github.com:443"
    )

    for endpoint in "${endpoints[@]}"; do
        IFS=':' read -r host port <<< "$endpoint"

        if timeout 5 bash -c "echo >/dev/tcp/$host/$port" 2>/dev/null; then
            log_info "✓ $endpoint - Reachable"
        else
            log_warn "✗ $endpoint - Not reachable"
        fi
    done
}

# Log analysis
analyze_logs() {
    log_info "Analyzing ExecuteStreamCommand logs..."

    if [ -f "$LOGS_DIR/processor.log" ]; then
        # Errori più comuni
        log_info "Most common errors:"
        grep -i "error" "$LOGS_DIR/processor.log" | \
            cut -d'-' -f4- | \
            sort | uniq -c | sort -nr | head -5

        # Performance patterns
        log_info "Performance patterns:"
        grep -o "Processing.*seconds" "$LOGS_DIR/processor.log" | \
            tail -10

        # Memory usage patterns
        log_info "Memory usage patterns:"
        grep -o "Memory.*MB" "$LOGS_DIR/processor.log" | \
            tail -10
    else
        log_warn "No processor logs found at $LOGS_DIR/processor.log"
    fi
}

# Generate comprehensive report
generate_report() {
    log_info "Generating comprehensive diagnostic report..."

    REPORT_FILE="$LOGS_DIR/diagnostic_report_$(date +%Y%m%d_%H%M%S).json"

    cat > "$REPORT_FILE" << EOF
{
    "diagnostic_report": {
        "timestamp": "$(date -Iseconds)",
        "system_info": {
            "hostname": "$(hostname)",
            "os": "$(uname -a)",
            "python_version": "$(python3 --version)",
            "nifi_home": "$NIFI_HOME",
            "user": "$(whoami)"
        },
        "resource_usage": {
            "disk_usage": "$(df -h $NIFI_HOME | tail -1)",
            "memory_info": "$(free -h | grep Mem)",
            "cpu_info": "$(nproc) cores",
            "load_average": "$(uptime | awk -F'load average:' '{print $2}')"
        },
        "configuration": {
            "scripts_directory": "$SCRIPTS_DIR",
            "logs_directory": "$LOGS_DIR",
            "working_directory": "$WORKING_DIR",
            "environment_variables": {
                "LOG_LEVEL": "${LOG_LEVEL:-INFO}",
                "PYTHONPATH": "${PYTHONPATH:-not_set}",
                "MONITORING_ENABLED": "${MONITORING_ENABLED:-false}"
            }
        }
    }
}
EOF

    log_info "Report generated: $REPORT_FILE"
}

# Cleanup old logs and temporary files
cleanup_environment() {
    log_info "Cleaning up environment..."

    # Remove old logs (older than 7 days)
    find "$LOGS_DIR" -name "*.log" -mtime +7 -delete 2>/dev/null || true

    # Remove old temporary files
    find "$WORKING_DIR" -name "test_*" -mtime +1 -delete 2>/dev/null || true

    # Clear Python cache
    find "$SCRIPTS_DIR" -name "__pycache__" -type d -exec rm -rf {} + 2>/dev/null || true
    find "$SCRIPTS_DIR" -name "*.pyc" -delete 2>/dev/null || true

    log_info "Cleanup completed"
}

# Main execution
main() {
    log_info "Starting ExecuteStreamCommand diagnostic suite..."

    case "${1:-all}" in
        "system")
            check_system_requirements
            ;;
        "config")
            test_executestream_config
            ;;
        "performance")
            performance_benchmark
            ;;
        "memory")
            detect_memory_leaks
            ;;
        "network")
            test_network_connectivity
            ;;
        "logs")
            analyze_logs
            ;;
        "cleanup")
            cleanup_environment
            ;;
        "all"|*)
            check_system_requirements
            test_executestream_config
            performance_benchmark
            detect_memory_leaks
            test_network_connectivity
            analyze_logs
            generate_report
            cleanup_environment
            ;;
    esac

    log_info "Diagnostic suite completed"
}

# Execute main function
main "$@"

Checklist best practices

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# Checklist best practices ExecuteStreamCommand

## Setup e configurazione

### Setup ambiente
- Python virtual environment configurato correttamente
- Dipendenze installate con versioni locked (requirements.txt)
- Directory structure organizzata (/scripts, /logs, /working, /config)
- Permessi file e directory configurati appropriatamente
- Variabili ambiente configurate (LOG_LEVEL, PYTHONPATH, etc.)

### Configurazione NiFi
- Command Path punta a eseguibile corretto (/usr/bin/python3)
- Command Arguments include path script completo
- Working Directory impostata correttamente
- Ignore STDIN/STDOUT configurati appropriatamente (false per entrambi)
- Timeout processore configurato per workload specifico

### Configurazione sicurezza
- Script eseguibili hanno permessi appropriati (755)
- Directory working temporanea protetta
- Credenziali esterne gestite tramite environment variables
- Input validation implementata per prevenire injection
- Audit logging abilitato per compliance

## Best practices di sviluppo

### Struttura script
- Shebang line corretta (#!/usr/bin/env python3)
- Logging strutturato con livelli appropriati
- Error handling completo con try/catch blocks
- Graceful exit codes (0 per successo, 1+ per errori)
- Stdin/stdout handling robusto

### Qualita codice
- Type hints utilizzate per parametri e return values
- Docstring complete per classi e metodi pubblici
- Validazione input completa prima del processing
- Gestione memory-efficient per large datasets
- Unit tests implementati per logica business critica

### Ottimizzazione performance
- Batch processing per grandi volumi dati
- Connection pooling per database/API calls
- Caching implementato dove appropriato
- Memory profiling eseguito per identificare leaks
- Async processing per I/O intensive operations

## Operazioni e monitoraggio

### Logging e debug
- Log levels configurabili tramite environment
- Structured logging (JSON format) per parsing automatico
- Sensitive data redaction nei log
- Rotation log files configurata
- Debug logging disponibile per troubleshooting

### Monitoraggio e alerting
- Metriche performance raccolte (processing time, memory usage)
- Health checks implementati
- Alert configurati per error rates e performance degradation
- Dashboard monitoring setup (Grafana/Kibana)
- SLA monitoring per critical processes

### Gestione errori e recovery
- Circuit breaker pattern per external services
- Retry logic con exponential backoff
- Dead letter queue per failed records
- Partial processing support per batch operations
- Meccanismi di rollback per operazioni transazionali

## Sicurezza e compliance

### Protezione dati
- Encryption per sensitive data at rest e in transit
- PII detection e redaction automatica
- Data classification implemented
- Access controls configurati appropriatamente
- Audit trail completo per compliance

### Sicurezza di rete
- TLS/SSL per external API calls
- Network segmentation rispettata
- Firewall rules configurate
- VPN connectivity per external services
- Certificate management automatizzato

### Framework di compliance
- GDPR compliance verificata per EU data
- HIPAA compliance per healthcare data
- SOC2 controls implementati
- Data retention policies applicate
- Privacy by design principles seguiti

## Deploy e manutenzione

### Pipeline CI/CD
- Setup pipeline di test automatizzati
- Gate di qualita codice (linting, security scanning)
- Processo di deploy automatizzato
- Configurazioni specifiche per ambiente
- Procedure di rollback documentate

### Backup e ripristino
- Script di backup automatizzati
- Backup delle configurazioni
- Piano di disaster recovery testato
- Requisiti RTO/RPO definiti
- Procedure di ripristino documentate

### Documentazione
- README con istruzioni di setup
- Documentazione API per endpoint custom
- Guida di risoluzione problemi completa
- Diagrammi architetturali aggiornati
- Runbook operativo completo

## Strategia di test

### Test unitari
- Copertura test >= 80% per logica business
- Oggetti mock per dipendenze esterne
- Casi limite testati (input vuoto, dati malformati)
- Condizioni di errore simulate e testate
- Test di regressione delle performance

### Test di integrazione
- Test end-to-end dei flow NiFi
- Integrazione con servizi esterni testata
- Connettivita database testata
- Integrazione error handling verificata
- Test di carico per volumi attesi

### Test di performance
- Metriche di performance di baseline stabilite
- Test di carico eseguito regolarmente
- Test di memory leak completati
- Test di scalabilita per crescita prevista
- Test di stress per scenari di failure

Conclusioni e prossimi passi

Riepilogo implementazione enterprise

Questa guida completa ha coperto tutti gli aspetti critici per implementare ExecuteStreamCommand in ambienti enterprise-grade:

  1. Architettura Robusta: Pattern di comunicazione stdin/stdout con lifecycle management completo
  2. Integrazione Python Avanzata: Script enterprise-ready con error handling, logging e monitoring
  3. Pattern Microservizi: Orchestrazione servizi esterni con resilienza e circuit breaker
  4. Sicurezza e compliance: Crittografia, audit trail, validation input e compliance GDPR/HIPAA
  5. Eccellenza operativa: Monitoraggio, alerting, troubleshooting e ottimizzazione performance

Best practices implementate

Prontezza enterprise

  • Error Handling Granulare: Gestione errori per severity level con recovery automatico
  • Hardening di sicurezza: Input validation, encryption, audit trail per compliance
  • Ottimizzazione performance: Batch processing, caching, async operations
  • Monitoraggio completo: Metriche sistema/applicazione, alerting automatico

Eccellenza operativa

  • Logging Strutturato: JSON logging per parsing automatico e correlation
  • Diagnostic Tools: Suite completa per troubleshooting e performance tuning
  • Automazione: Pipeline CI/CD, backup automatici, procedure di recovery
  • Documentazione: Runbook completi, diagrammi architetturali, best practices

Roadmap estensioni future

Il sistema ExecuteStreamCommand è progettato per essere facilmente estendibile con:

  • Integrazione ML/AI: Supporto TensorFlow/PyTorch per real-time inference
  • Kubernetes Orchestration: Deployment containerizzato con auto-scaling
  • Multi-Cloud Support: Integrazione servizi AWS/Azure/GCP
  • Advanced Analytics: Real-time stream analytics con Apache Kafka

Questa implementazione enterprise di ExecuteStreamCommand fornisce una base solida per processare dati mission-critical in ambienti NiFi di produzione, garantendo sicurezza, performance e compliance.


Padroneggia ExecuteStreamCommand per trasformare NiFi in una piattaforma di data processing enterprise-grade! 🚀