Transformed EFD¶
This guide provides detailed technical information about the transformed EFD service architecture, class relationships, and code organization for developers who need to understand, modify, or extend the codebase.
Quick Reference for Experienced Developers¶
Note
For developers familiar with the system: Jump directly to the sections below. See detailed explanations in subsequent sections.
Core Classes:
Transform: Main processing engine (transform.py)Summary: Statistical processing (summary.py)ConfigModel: Configuration validation (config_model.py)QueueManager: Task scheduling (queue_manager.py)
Key Extension Points:
Add transformation functions:
Summaryclass methodsAdd data sources: Create new DAO classes inheriting from
DBBaseAdd output formats: Extend
_process_column()methodAdd processing logic: Override
_compute_column_value()
Common Development Tasks:
Add new statistical function: Add method to
SummaryclassAdd new instrument: Create config file + schema generation
Modify data processing: Edit
Transform.process_interval()Add new database table: Create DAO class + update schemas
Testing Commands:
python -m pytest tests/
python -m pytest tests/transformed_efd/test_summary.py -v
Architecture Overview¶
The transformed EFD service is a configuration-driven data processing pipeline that transforms raw Engineering and Facilities Database (EFD) telemetry into structured queryable metrics organized by exposure and visit timespans.
The core architectural principle is that everything stems from configuration files. This design permeates every aspect of the system:
Data Sources: EFD topics and fields are defined in YAML
Transformations: Statistical functions and their parameters are specified in config
Output Schema: Database tables and column structures are generated from config
Processing Logic: Exposure/visit boundary processing is configured, not hardcoded
Instrument Support: New instruments require only configuration changes
Key Architectural Principles:
Configuration-Driven: All transformations, schemas, and processing logic defined in YAML files
Separation of Concerns: Business logic (YAML) separated from execution engine (Python)
Timespan-Based Processing: Data processed only within exposure/visit boundaries
Extensible Design: New instruments, metrics, and functions added through configuration
Schema Generation: Database schemas automatically generated from configuration definitions
High-Level Architecture Flow¶
The following diagram shows the overall data flow and component relationships:
flowchart TD
A[YAML Config Files] --> B[ConfigModel Validation]
B --> C[Schema Generation]
C --> D[Database Tables]
E[EFD InfluxDB] --> F[InfluxDbDao]
G[LSST Butler] --> H[ButlerDao]
F --> I[Transform Engine]
H --> I
B --> I
I --> J[Summary Statistics]
J --> K[Result Storage]
K --> L[Exposure/Vist EFD Tables]
M[Task Queue] --> N[QueueManager]
N --> I
O[CLI Interface] --> P[TransformEfd Main]
P --> N
P --> I
style A fill:#e1f5fe
style I fill:#f3e5f5
style L fill:#e8f5e8
Core Components¶
Configuration Layer (config_model.py)¶
Pydantic models that define and validate YAML configuration files:
Field Model
Defines individual EFD field specifications with data types, units, and validation rules
Handles field mapping between EFD topic names and output column names
Supports field transformations and unit conversions
Validates data types (float, int, string, boolean) with proper error handling
Topic Model
Groups related EFD fields under a single topic specification
Defines query parameters (time ranges, aggregation settings, packed series handling)
Maps EFD topic names to database table structures
Handles topic-level configuration like server-side aggregation preferences
Column Model
Complete transformation definition including name, function, target tables, and aggregation settings
Links statistical functions to specific EFD data sources
Defines output schema structure and data types
Handles column-level configuration like null value handling and precision settings
ConfigModel
Top-level configuration container that orchestrates all processing
Validates entire configuration against schema requirements
Manages configuration versioning and compatibility
Provides configuration introspection and debugging capabilities
Configuration Flow: YAML files → Pydantic validation → Database schema generation → Runtime processing
This layer handles:
Configuration validation before processing begins
Database schema generation from config definitions
Type safety throughout the transformation pipeline
Runtime configuration changes without code modifications
Data Access Layer (dao/)¶
Database access objects for different data sources:
DBBase (base.py)
Abstract base class providing common database operations and connection management
Implements SQLAlchemy engine creation with NullPool for connection management
Provides transaction management with automatic commit handling
Handles database-specific SQL dialect differences (SQLite and PostgreSQL)
Includes upsert and bulk insert operations with chunked processing
Provides query execution methods (fetch_all_dict, fetch_one_dict, fetch_scalar)
InfluxDbDao (influxdb.py)
EFD data querying with optimized batching and time-series specific operations
Implements efficient query construction for time-range based data retrieval with server-side aggregation
Handles packed series reconstruction using _merge_packed_time_series method
Manages InfluxDB connection parameters via environment variables (EFD_HOST, EFD_USERNAME, etc.)
Supports field batching to avoid query complexity limits (max_fields_per_query=100)
Provides both regular time series queries and packed time series queries
ButlerDao (butler.py)
Exposure/visit metadata retrieval from LSST Butler system
Queries exposure and visit information within specified time ranges using timespan overlaps
Handles Butler repository configuration through Butler object initialization
Provides metadata filtering by instrument and time period
Converts Butler resultsets to pandas DataFrames and dictionaries
Handles EmptyQueryResultError gracefully by returning empty lists
ExposureEfdDao, VisitEfdDao
Output database operations for storing transformed EFD data in PostgreSQL
Implements upsert operations using execute_upsert method from DBBase
Manages both regular and unpivoted table variants (exposure_efd, exposure_efd_unpivoted)
Handles primary key validation and missing column detection
Provides row counting and retrieval by ID operations
Uses chunked processing for large dataset inserts (commit_every=100)
TransformdDao (transformd.py)
Task scheduling metadata management for the transformed_efd_scheduler table
Manages processing queue state with status tracking (pending, running, completed, failed)
Handles task lifecycle methods: task_started, task_completed, task_failed, task_retries_increment
Provides task selection methods: select_next, select_last, select_recent, select_queued
Implements orphaned task detection and cleanup via fail_orphaned_tasks
Manages execution time tracking and retry counting for failed tasks
Scheduler Tables and Task Management¶
The transformed EFD system uses a dedicated efd_scheduler schema to manage processing tasks across all instruments. This schema contains separate tables for each instrument, tracking task lifecycle, execution status, and providing coordination for distributed processing.
Scheduler Schema Structure
The efd_scheduler schema contains individual tables for each instrument:
efd_scheduler.latiss: LATISS instrument task management
efd_scheduler.lsstcam: LSSTCam instrument task management
efd_scheduler.lsstcomcam: LSSTComCam instrument task management
Each instrument table has the following columns:
id: Auto-incrementing primary key for unique task identification
start_time: Start time of the transformation interval (required)
end_time: End time of the transformation interval (required)
timewindow: Time window expansion in minutes for query boundaries
status: Task processing status (pending, idle, running, completed, failed)
process_start_time: Timestamp when task processing began
process_end_time: Timestamp when task processing completed
process_exec_time: Total execution time in seconds
exposures: Number of exposures processed in this task
visits1: Number of visits processed in this task
retries: Number of retry attempts for failed tasks
error: Error message if task failed
butler_repo: Butler repository path used for this task
created_at: Timestamp when task record was created
Task Lifecycle and Status Management
Tasks progress through the following lifecycle with different initial statuses based on execution mode:
Task Creation: - Jobs: Tasks created with status “idle” (waiting for processing) - CronJobs: Tasks created with status “pending” (waiting for processing)
Task Selection: Available tasks are selected using
select_next()or similar methodsTask Execution: Status updated to “running” via
task_started()Task Completion: Status updated to “completed” via
task_completed()with execution metricsTask Failure: Status updated to “failed” via
task_failed()with error detailsTask Retry: Failed tasks can be retried with exponential backoff via
task_retries_increment()
Task Status Meanings:
pending: Default status for CronJob tasks, ready for processing
idle: Status for Job tasks, ready for processing
running: Task currently being processed
completed: Task finished successfully
failed: Task failed with error (eligible for retry)
stale: Task marked as stale after 72 hours (no longer eligible for retry)
Task Creation Process
The QueueManager.create_tasks() method:
Divides time ranges into processing intervals (default: 5-minute chunks)
Checks for existing tasks to avoid duplicates
Creates tasks with appropriate time boundaries and Butler repository references
Uses bulk insert operations for efficient database operations
Handles both one-time job creation and continuous cronjob task generation
Task Selection and Processing
The system provides multiple task selection strategies:
select_next(): Gets the next available task for processing
select_last(): Retrieves the most recently created task
select_recent(): Gets tasks within a recent time window
select_queued(): Finds tasks with specific status (pending, idle, etc.)
waiting_tasks(): Gets tasks ready for processing
failed_tasks(): Retrieves failed tasks eligible for retry
Orphaned Task Management
The system includes orphaned task detection and cleanup:
fail_orphaned_tasks(): Identifies and marks abandoned tasks as failed
Orphaned Task Criteria: Tasks in “running” status for extended periods
Automatic Cleanup: Prevents resource leaks and enables task recovery
Retry Logic: Failed tasks can be retried with exponential backoff (2.8^retries hours)
Stale Task Management: Tasks older than 72 hours are marked as “stale” and no longer eligible for retry
Task Coordination Features
Duplicate Prevention: Checks for existing tasks before creation
Time Window Management: Handles overlapping time windows and gaps
Butler Repository Tracking: Links tasks to specific Butler repository versions
Execution Metrics: Tracks processing counts, execution times, and success rates
Error Handling: Comprehensive error logging and retry mechanisms
Transformation Engine¶
The transformation engine is the core of the system, orchestrating the entire data processing pipeline from EFD telemetry to structured database records.
Main Transformation Process Flow
The transformation process follows this sequence:
- Time Interval Processing (process_interval)
Retrieves exposures and visits from Butler within the time window
Calculates topic query intervals based on exposure/visit timespans
Processes each topic and column according to configuration
- Topic Optimization (_map_topics)
Groups columns by topic, packed_series, start_offset, and aggregation settings
Minimizes duplicate EFD queries by batching similar requests
Enables server-side aggregation when configured (pre_aggregate_interval)
- EFD Data Retrieval (_query_efd_values)
Queries InfluxDB for telemetry data within calculated time windows
Handles both regular time series and packed time series data
Applies start_offset adjustments for time window modifications
- Column Value Computation (_compute_column_value)
Filters EFD data to exact exposure/visit time boundaries
Applies statistical transformations via Summary class
Handles time offset adjustments and data validation
- Result Storage (_store_results)
Stores computed metrics in PostgreSQL tables (exposure_efd, visit1_efd)
Supports both pivoted and unpivoted table formats
Uses chunked processing for large datasets
Transform (transform.py)
Key methods and their responsibilities:
process_interval(): Main entry point that orchestrates the entire transformation pipeline
_map_topics(): Critical optimization that groups similar queries to minimize EFD roundtrips
_compute_column_value(): Core computation engine that applies transformations within timespans
_process_topic(): Handles individual topic processing and data retrieval
_process_column(): Manages column-level processing and result aggregation
get_schema_by_instrument(): Maps instrument names to database schemas
Summary (summary.py)
Statistical processing framework with the following transformation functions:
mean(): Arithmetic mean ignoring NaN values
stddev(): Standard deviation with configurable degrees of freedom (ddof parameter)
max()/min(): Maximum/minimum values ignoring NaN
rms_from_polynomial_fit(): RMS after polynomial detrending (degree and fit_basis parameters)
most_recent_value(): Most recent scalar value with optional start_offset
apply(): Generic method dispatcher with comprehensive error handling
Developer Extension Points
When adding new functionality that may not fit the current workflow:
- New Transformation Functions
Add method to Summary class in summary.py
Ensure proper NaN handling and error management
Add comprehensive tests for edge cases
- New Data Sources
Create new DAO class inheriting from DBBase
Add data retrieval methods to Transform class
Update _map_topics() if new grouping logic is needed
- New Output Formats
Extend _process_column() method to handle new table types
Add new storage methods similar to _process_exposures()
Update schema generation if new table structures are needed
- Custom Processing Logic
Override _compute_column_value() for specialized computation
Modify _map_topics() for custom query optimization
Extend _process_interval() for additional processing steps
- Performance Optimizations
Adjust commit_every parameter for batch processing
Modify _map_topics() grouping logic for better query batching
Implement custom caching in Summary class if needed
Critical Integration Points
Configuration Validation: All new features must work with existing Pydantic models
Error Handling: Use @handle_processing_errors decorator for consistent error management
Logging: Maintain detailed logging for debugging and monitoring
Timespan Processing: All transformations must respect exposure/visit boundaries
Database Compatibility: Ensure new features work with existing schema generation
Application Entry Point¶
Transform EFD (transform_efd.py)
The main application entry point that orchestrates the entire transformed EFD workflow. This module handles CLI argument parsing, component initialization, task coordination, and graceful shutdown management.
Core Workflow Functions
main(): Primary application entry point that initializes all components and coordinates execution
build_argparser(): Constructs comprehensive CLI interface with required and optional arguments
get_logger(): Configures structured logging with console and file output, UTC timestamps
read_config(): Loads and validates YAML configuration files using Pydantic models
Task Processing Functions
_process_task(): Processes individual tasks with error handling, status updates, and retry logic
process_tasks(): Executes task batches with graceful shutdown support and progress tracking
handle_job(): Manages one-time job execution with custom time windows and resume capabilities
handle_cronjob(): Handles periodic cronjob execution with automatic task creation and retry management
Utility Functions
parse_utc_naive(): Converts ISO datetime strings to UTC-naive datetime objects
_to_astropy_time(): Converts datetime objects to Astropy Time objects in UTC scale
_get_retryable_tasks(): Implements exponential backoff logic for failed task retries
Component Initialization Sequence
CLI Parsing: Validates command-line arguments and configuration requirements
Logging Setup: Configures structured logging with environment variable control
Butler Initialization: Creates Butler instance for metadata access
EFD Connection: Establishes InfluxDB connection with optimized query parameters
Transform Setup: Initializes main transformation processor with all dependencies
Queue Manager: Creates task scheduling and coordination system
Orphaned Task Cleanup: Handles recovery from previous interrupted executions
Execution Modes
Job Mode: One-time processing of specific time ranges
Requires start_time and end_time parameters
Supports resume functionality for interrupted jobs
Creates tasks for the specified time window
Cronjob Mode: Continuous periodic processing
Automatically creates tasks based on current time
Handles failed task retries with exponential backoff
Manages orphaned task detection and cleanup
Error Handling and Recovery
Signal Handling: Graceful shutdown on SIGTERM/SIGINT with current task completion
Task State Management: Tracks task lifecycle (pending → running → completed/failed)
Retry Logic: Exponential backoff for failed tasks with configurable parameters
Orphaned Task Recovery: Automatic detection and cleanup of abandoned tasks
Batch Processing: Configurable batch sizes with shutdown checks between batches
Management Utilities¶
QueueManager (queue_manager.py)
Task scheduling and workflow management for distributed processing
Manages processing queues, task priorities, and resource allocation
Handles task dependencies and parallel execution coordination
Provides monitoring, logging, and debugging capabilities for workflow execution
Implements fault tolerance and task retry mechanisms
generate_schema_from_config.py
Automated schema generation from YAML configuration files
Creates PostgreSQL table definitions based on Column specifications
Handles schema versioning and migration management
Validates generated schemas against database constraints and best practices
Provides schema comparison and diff capabilities for configuration changes
Class Relationships¶
The transformed EFD system architecture consists of several interconnected layers that work together to process telemetry data from raw EFD streams into structured database records.
System Architecture Overview
The system follows a layered architecture with clear separation of concerns:
Application Layer: Main entry point that orchestrates the entire workflow (transform_efd.py)
Configuration Layer: Pydantic models that define and validate YAML configurations
Data Access Layer: DAOs that abstract database operations and external service interactions
Transformation Layer: Core processing engine that orchestrates data flow and transformations
Management Layer: Task scheduling and workflow coordination
Integration Layer: CLI interfaces and external service connections
Detailed Class Relationships
classDiagram
class Transform {
-butler_dao: ButlerDao
-efd: InfluxDbDao
-config: Dict
-commit_every: int
+process_interval()
+_compute_column_value()
+_map_topics()
+_store_results()
}
class Summary {
-_raw_dataframe: DataFrame
-_data_array: ndarray
+mean()
+stddev()
+max()
+min()
+apply()
}
class ConfigModel {
+version: str
+columns: List[Column]
}
class Column {
+name: str
+function: str
+tables: List[str]
+topics: List[Topic]
+function_args: Dict
}
class Topic {
+name: str
+fields: List[Field]
}
class Field {
+name: str
}
class InfluxDbDao {
-url: str
-database_name: str
+select_time_series()
+select_packed_time_series()
}
class ButlerDao {
-butler: Butler
+exposures_by_period()
+visits_by_period()
}
class DBBase {
-engine: Engine
-db_uri: str
+get_db_engine()
+execute_upsert()
+fetch_all_dict()
}
class ExposureEfdDao {
-tbl: Table
+upsert()
+get_by_exposure_id()
+count()
}
class VisitEfdDao {
-tbl: Table
+upsert()
+get_by_visit_id()
+count()
}
class TransformdDao {
-tbl: Table
+task_started()
+task_completed()
+task_failed()
+select_next()
+bulk_insert()
+select_by_id()
+count()
+fail_orphaned_tasks()
}
class QueueManager {
-dao: TransformdDao
-instrument: str
+create_tasks()
+get_next_task()
+mark_task_completed()
}
class TransformEfd {
+main()
+build_argparser()
+get_logger()
+read_config()
+_process_task()
+process_tasks()
+handle_job()
+handle_cronjob()
}
%% Core transformation relationships
Transform --> ButlerDao : "creates and uses"
Transform --> InfluxDbDao : "queries EFD data"
Transform --> Summary : "instantiates for computations"
Transform --> ConfigModel : "validates configuration"
%% Configuration model relationships
ConfigModel --> Column : "contains list of"
Column --> Topic : "references list of"
Topic --> Field : "contains list of"
%% Data access inheritance
ExposureEfdDao --> DBBase : "inherits from"
VisitEfdDao --> DBBase : "inherits from"
TransformdDao --> DBBase : "inherits from"
%% Application orchestration relationships
TransformEfd --> Transform : "initializes and coordinates"
TransformEfd --> QueueManager : "initializes and manages"
TransformEfd --> Butler : "creates Butler instance"
TransformEfd --> InfluxDbDao : "creates EFD connection"
TransformEfd --> ConfigModel : "validates configuration"
%% Queue management relationships
QueueManager --> TransformdDao : "manages via"
QueueManager --> Transform : "coordinates execution"
%% Database operations
Transform --> ExposureEfdDao : "stores results via"
Transform --> VisitEfdDao : "stores results via"
The system consists of the following key classes and their relationships:
Core Classes
Transform: Main processing engine with dependencies on ButlerDao, InfluxDbDao, Summary, and ConfigModel
Summary: Statistical processing framework with methods for mean, stddev, max, min, rms_from_polynomial_fit, most_recent_value, apply
ConfigModel: Pydantic model containing version and list of Column objects
Column: Configuration object with name, function, tables, topics, and function_args
Topic: Configuration object with name and list of Field objects
Field: Simple configuration object with name attribute
Data Access Classes
DBBase: Abstract base class providing database engine management, connection handling, and common operations
InfluxDbDao: Inherits from DBBase, handles InfluxDB connections and time-series queries
ButlerDao: Inherits from DBBase, manages Butler repository access for exposure/visit metadata
ExposureEfdDao: Inherits from DBBase, handles exposure_efd table operations
VisitEfdDao: Inherits from DBBase, handles visit1_efd table operations
TransformdDao: Inherits from DBBase, manages task scheduling metadata
Management Classes
QueueManager: Task scheduling and workflow coordination using TransformdDao
TransformEfd: Main application entry point that orchestrates all components
Key Relationships
Transform creates and uses ButlerDao for metadata access
Transform queries InfluxDbDao for EFD telemetry data
Transform instantiates Summary objects for statistical computations
Transform validates ConfigModel for configuration-driven processing
ConfigModel contains lists of Column objects
Column objects reference lists of Topic objects
Topic objects contain lists of Field objects
All specialized DAOs inherit from DBBase for consistent database operations
TransformEfd initializes and coordinates Transform and QueueManager
TransformEfd creates Butler instances and InfluxDbDao connections
QueueManager manages tasks via TransformdDao
Transform stores results via ExposureEfdDao and VisitEfdDao
Key Relationship Patterns
1. Composition and Dependency Injection
Transform class receives all its dependencies (Butler, InfluxDbDao, config) through constructor injection
This enables flexible testing and configuration without tight coupling
2. Configuration-Driven Architecture
ConfigModel validates YAML configurations and provides structured access to Column, Topic, and Field definitions
Transform class uses configuration to drive all processing logic without hardcoded transformations
3. Data Access Object Pattern
All DAOs inherit from DBBase, providing consistent database operations
Specialized DAOs (ExposureEfdDao, VisitEfdDao, TransformdDao) handle specific table operations
Transform class uses DAOs for all data persistence operations
4. Strategy Pattern for Transformations
Summary class provides pluggable transformation functions (mean, stddev, etc.)
Transform class dynamically instantiates Summary objects and calls transformation methods based on configuration
5. Template Method Pattern
Transform.process_interval() defines the high-level processing algorithm
Specific processing steps (_map_topics, _compute_column_value, etc.) can be overridden for custom behavior
6. Observer Pattern via Logging
All classes use structured logging for monitoring and debugging
@handle_processing_errors decorator provides consistent error handling across Transform methods
Data Flow Relationships
Application Flow: CLI arguments → TransformEfd initialization → Component coordination
Configuration Flow: YAML files → ConfigModel validation → Transform initialization
Metadata Flow: Butler → ButlerDao → Transform for exposure/visit information
Telemetry Flow: InfluxDB → InfluxDbDao → Transform for EFD data retrieval
Transformation Flow: EFD data → Summary → Transform for statistical processing
Storage Flow: Transform → DAOs → PostgreSQL for result persistence
Scheduling Flow: QueueManager → TransformdDao → Transform for task coordination
Integration Points for Extensions
New Data Sources: Add new DAO classes inheriting from DBBase
New Transformations: Add methods to Summary class
New Output Formats: Extend Transform._store_results() method
Custom Processing: Override Transform._compute_column_value() method
Alternative Scheduling: Replace QueueManager with custom task coordination
Service Operation Flow¶
The transformed EFD service runs in two ways:
Direct Python: Run
transform_efd.pywith command line argumentsDocker Container: Run in Kubernetes cluster
Both methods use the same code. The diagram shows the processing flow:
flowchart TD
Start([Service Start]) --> ExecutionMode{Execution Method?}
ExecutionMode -->|Direct Python| ParseCLI[Parse CLI Arguments<br/>- Config file<br/>- Instrument<br/>- Time range<br/>- Database connections]
ExecutionMode -->|Containerized| LoadEnv[Load Environment Variables<br/>- CONFIG_FILE<br/>- INSTRUMENT<br/>- BUTLER_REPO<br/>- CONSDB_URL<br/>- EFD<br/>- TIMEDELTA]
ParseCLI --> ValidateConfig[Validate Configuration<br/>- Load YAML config<br/>- Validate with Pydantic<br/>- Check schema compatibility]
LoadEnv --> ValidateConfig
ValidateConfig --> InitLogging[Initialize Logging<br/>- Setup structured logging<br/>- Configure file/console output<br/>- Set log levels]
InitLogging --> InitComponents[Initialize Core Components]
subgraph InitComponents[Component Initialization]
InitButler[Initialize ButlerDAO<br/>- Connect to Butler repo<br/>- Setup metadata access]
InitEFD[Initialize InfluxDbDAO<br/>- Connect to EFD InfluxDB<br/>- Setup authentication<br/>- Configure query limits]
InitTransform[Initialize Transform<br/>- Load configuration<br/>- Setup processing pipeline<br/>- Configure commit settings]
InitQueue[Initialize QueueManager<br/>- Connect to task database<br/>- Setup task scheduling<br/>- Check for orphaned tasks]
end
InitComponents --> ModeDecision{Execution Mode?}
ModeDecision -->|Job Mode| CreateJobTasks[Create Job Tasks<br/>- Parse start/end times<br/>- Generate time intervals<br/>- Create task records]
ModeDecision -->|Cronjob Mode| GetCronTasks[Get Cronjob Tasks<br/>- Check for pending tasks<br/>- Handle failed retries<br/>- Create new intervals]
CreateJobTasks --> ProcessTasks[Core Processing Loop]
GetCronTasks --> ProcessTasks
subgraph ProcessTasks[Core Processing Loop]
GetTask[Get Next Task<br/>- Query task queue<br/>- Select pending/idle task<br/>- Update status to 'running']
GetTask --> TaskExists{Task Available?}
TaskExists -->|No| WaitForTask[Wait for New Tasks<br/>- Sleep interval<br/>- Check for new tasks]
WaitForTask --> GetTask
TaskExists -->|Yes| QueryButler[Query Butler for Metadata<br/>- Get exposures in time range<br/>- Get visits in time range<br/>- Extract timespan boundaries]
QueryButler --> HasData{Data Found?}
HasData -->|No| LogEmpty[Log Empty Result<br/>- Update task status<br/>- Mark as completed]
LogEmpty --> GetTask
HasData -->|Yes| MapTopics[Map & Optimize Topics<br/>- Group columns by topic<br/>- Minimize EFD queries<br/>- Setup server aggregation]
MapTopics --> QueryEFD[Query EFD for Telemetry<br/>- Execute time series queries<br/>- Handle packed series<br/>- Apply time offsets]
QueryEFD --> ProcessColumns[Process Each Column<br/>- Filter to exposure/visit bounds<br/>- Apply statistical functions<br/>- Compute metric values]
ProcessColumns --> StoreResults[Store Results<br/>- Upsert to exposure_efd<br/>- Upsert to visit1_efd<br/>- Handle unpivoted tables]
StoreResults --> UpdateTask[Update Task Status<br/>- Mark task completed<br/>- Record processing counts<br/>- Log success metrics]
UpdateTask --> ErrorCheck{Processing Error?}
ErrorCheck -->|Yes| HandleError[Handle Processing Error<br/>- Log error details<br/>- Mark task failed<br/>- Increment retry count]
ErrorCheck -->|No| GetTask
HandleError --> GetTask
end
ProcessTasks --> ShutdownCheck{Shutdown Signal?}
ShutdownCheck -->|No| ProcessTasks
ShutdownCheck -->|Yes| GracefulShutdown[Graceful Shutdown<br/>- Complete current task<br/>- Update orphaned tasks<br/>- Close connections]
GracefulShutdown --> FinalSummary[Final Processing Summary<br/>- Log total statistics<br/>- Report completion status<br/>- Cleanup resources]
Processing Steps:
Start - Parse arguments or load environment variables - Load YAML config file - Initialize logging and components
Task Management - Job Mode: Create tasks for specific time range - Cronjob Mode: Create tasks continuously - Clean up orphaned tasks
Process Data - Get task from queue - Query Butler for exposures/visits - Query EFD for telemetry data - Apply transformations - Store results in database
Error Handling - Log errors - Retry failed tasks - Handle shutdown signals
Finish - Log statistics - Clean up resources
Key Processing Methods¶
Transform.process_interval()¶
Main method that processes data for a time range:
Query Butler for exposures and visits
Group columns by topic to reduce queries
Process each topic and apply transformations
Store results in database
Update task status
Transform._map_topics()¶
Groups similar EFD queries to reduce database calls:
Groups columns by topic, packed_series, start_offset, and aggregation settings
Reduces queries from hundreds to 10-50 per processing run
Enables server-side aggregation when configured
Transform._compute_column_value()¶
Applies statistical functions to time-series data:
Filter data to exposure/visit time boundaries
Handle missing data and NaN values
Apply statistical functions (mean, stddev, max, min, etc.)
Return computed values
Transform._query_efd_values()¶
Queries InfluxDB for telemetry data:
Handles regular and packed time series
Groups fields into batches (max 100 per query)
Applies time offsets
Manages connections and handles errors
Summary.apply()¶
Calls the correct statistical method:
Check for empty datasets
Call the requested method (mean, stddev, etc.)
Handle errors
Return None for invalid operations
QueueManager.create_tasks()¶
Creates processing tasks for time intervals:
Creates tasks in 5-minute chunks (default)
Checks for existing tasks to avoid duplicates
Sets initial status to ‘pending’
Links tasks to Butler repositories
Task status: pending → running → completed/failed → retry (if failed)
Execution Methods¶
Direct Python Execution¶
For development and testing:
python transform_efd.py \
-c config_latiss.yaml \
-i latiss \
-r s3://rubin-summit-users/butler.yaml \
-d postgresql://user:pass@host:port/db \
-E "efd_latiss" \
-m job \
-s 2024-01-01T00:00:00 \
-e 2024-01-01T01:00:00 \
-t 5 \
-w 1 \
-l /path/to/logfile.log \
-R
Required arguments:
-c, --config: YAML config file path-i, --instrument: Instrument name (converted to lowercase)-r, --repo: Butler repository path (default: s3://rubin-summit-users/butler.yaml)-d, --db: Database connection string-E, --efd: EFD connection string-m, --mode: Execution mode (job or cronjob)
Optional arguments:
-s, --start: Start time in ISO format (required for job mode)-e, --end: End time in ISO format (required for job mode)-t, --timedelta: Processing interval in minutes (default: 5)-w, --timewindow: Overlap window in minutes (default: 1)-l, --logfile: Log file path (optional, logs to console if not specified)-R, --resume: Resume pending tasks (flag, no value required)
Docker Container Execution¶
For execution in a Kubernetes cluster please see the Operator Guide.
Secrets and Credentials Management¶
The transformed EFD service integrates with the default vault system for secure management of secrets, passwords, and connection credentials. This ensures that sensitive information is not exposed in configuration files or command-line arguments.
Database Credentials: PostgreSQL connection strings and authentication tokens are retrieved from the vault system
EFD Authentication: InfluxDB credentials and API keys are managed securely through the vault
Butler Repository Access: Butler repository authentication credentials are handled via vault integration
Default Passwords: System default passwords and service account credentials are centrally managed
Data Transformation Functions¶
Available in Summary class:
mean(): Arithmetic mean ignoring NaN values
stddev(): Standard deviation with configurable degrees of freedom
max()/min(): Maximum/minimum values
rms_from_polynomial_fit(): RMS after polynomial detrending
most_recent_value(): Most recent scalar value
Extension Patterns¶
Adding New Transformation Functions¶
Add method to Summary class in summary.py:
def median(self) -> float:
"""Calculate the median, ignoring NaN values."""
values = self._get_numeric_values()
if len(values) == 0:
return np.nan
return np.nanmedian(values)
Add comprehensive tests in tests/transformed_efd/test_summary.py
Update documentation with usage examples
Extending Configuration Options¶
Add field to appropriate Pydantic model in config_model.py
Update validation logic if needed
Modify schema generation in generate_schema_from_config.py
Update transformation logic to handle new option
Adding New Data Sources¶
Create new DAO class inheriting from DBBase
Implement data retrieval methods with proper error handling
Integrate with transformation pipeline in transform.py
Add tests for new data source integration
Error Handling Strategy¶
Decorator-based: @handle_processing_errors decorator for consistent error logging
Graceful Degradation: Continue processing when individual transformations fail
Comprehensive Logging: Detailed error context for debugging
Validation First: Pydantic models validate configurations before processing
Common Development Scenarios¶
This section provides complete end-to-end examples for typical development tasks.
Scenario 1: Adding a New Statistical Function¶
Goal: Add a median calculation function to the Summary class.
Complete workflow:
Add the method to Summary class:
# In summary.py
def median(self) -> float:
"""Calculate the median, ignoring NaN values."""
values = self._get_numeric_values()
if len(values) == 0:
return np.nan
return np.nanmedian(values)
Add comprehensive tests:
# In tests/transformed_efd/test_summary.py
def test_median(summary_instance):
"""Test median calculation with valid data."""
assert summary_instance.median() == 3.0
Use in configuration:
- name: "dome_air_temp_median"
description: "Median dome air temperature during exposure."
function: "median"
datatype: float
ivoa: {"unit": "deg_C", "ucd": "phys.temperature;stat.median"}
tables: ["exposure_efd"]
packed_series: false
topics:
- name: lsst.sal.ESS.temperature
fields:
- name: domeAirTemperature
Scenario 2: Adding Support for a New Instrument¶
Goal: Add support for a new instrument called “NewCam”.
Complete workflow:
Create configuration file:
cp python/lsst/consdb/transformed_efd/config/config_latiss.yaml \
python/lsst/consdb/transformed_efd/config/config_newcam.yaml
Edit configuration for NewCam:
# In config_newcam.yaml
version: "1.0.0"
columns:
- name: newcam_dome_temperature_mean
tables: ["exposure_efd", "visit1_efd"]
function: mean
datatype: float
ivoa: {"unit": "deg_C", "ucd": "phys.temperature;stat.mean"}
description: Mean dome temperature for NewCam.
packed_series: false
topics:
- name: lsst.sal.ESS.temperature
fields:
- name: domeAirTemperature
Generate schema:
python ./python/lsst/consdb/transformed_efd/generate_schema_from_config.py --instrument newcam
Create Alembic migration:
alembic -n efd_newcam revision --autogenerate -m "Add NewCam instrument support"
Test the configuration:
python -m pytest tests/transformed_efd/test_config_model.py -v
Scenario 3: Adding a New Data Source¶
Goal: Add support for querying a new external API for additional telemetry data.
Complete workflow:
Create new DAO class:
# In dao/new_api.py
from .base import DBBase
class NewApiDao(DBBase):
def __init__(self, api_url: str, api_key: str):
self.api_url = api_url
self.api_key = api_key
def get_telemetry_data(self, start_time: str, end_time: str):
# Implementation for API calls
pass
Integrate with Transform class:
# In transform.py, add to __init__
self.new_api = NewApiDao(api_url, api_key)
# Add method to query new data source
def _query_new_api_data(self, start_time, end_time):
return self.new_api.get_telemetry_data(start_time, end_time)
Add tests:
# In tests/transformed_efd/test_new_api.py
def test_new_api_dao():
dao = NewApiDao("http://api.example.com", "test-key")
data = dao.get_telemetry_data("2023-01-01", "2023-01-02")
assert data is not None
Scenario 4: Modifying Data Processing Logic¶
Goal: Add custom preprocessing for specific telemetry topics.
Complete workflow:
Override processing method:
# In transform.py, extend the Transform class
def _preprocess_telemetry_data(self, data, topic_name):
if topic_name == "lsst.sal.ESS.temperature":
# Apply temperature-specific preprocessing
data = self._apply_temperature_corrections(data)
return data
Integrate into processing pipeline:
# In _process_topic method
def _process_topic(self, topic_config, start_time, end_time):
# Get raw data
raw_data = self._query_efd_values(topic_config, start_time, end_time)
# Apply preprocessing
processed_data = self._preprocess_telemetry_data(raw_data, topic_config.name)
# Continue with normal processing
return self._compute_column_values(processed_data, topic_config)
Add configuration support:
# In config file, add preprocessing options
topics:
- name: lsst.sal.ESS.temperature
fields:
- name: domeAirTemperature
preprocessing:
apply_temperature_correction: true
Scenario 5: Working with Scheduler Tables¶
Goal: Monitor and manage processing tasks in the scheduler tables.
Complete workflow:
Query task status:
# In your monitoring script
from lsst.consdb.transformed_efd.dao.transformd import TransformdDao
dao = TransformdDao(db_uri, instrument="latiss", schema="efd_scheduler")
# Get recent tasks
recent_tasks = dao.select_recent(limit=10)
for task in recent_tasks:
print(f"Task {task['id']}: {task['status']} - {task['start_time']} to {task['end_time']}")
Monitor task processing:
# Check task metrics
total_tasks = dao.count()
failed_tasks = dao.select_queued(status="failed")
running_tasks = dao.select_queued(status="running")
stale_tasks = dao.select_queued(status="stale")
print(f"Total tasks: {total_tasks}")
print(f"Failed tasks: {len(failed_tasks)}")
print(f"Running tasks: {len(running_tasks)}")
print(f"Stale tasks: {len(stale_tasks)}")
Handle orphaned tasks:
# Clean up orphaned tasks
orphaned_count = dao.fail_orphaned_tasks()
print(f"Marked {orphaned_count} tasks as orphaned")
Create custom task management:
# Custom task retry logic
def retry_failed_tasks(dao, max_retries=3):
failed_tasks = dao.select_queued(status="failed")
for task in failed_tasks:
if task['retries'] < max_retries:
dao.update(task['id'], {
'status': 'pending',
'retries': task['retries'] + 1,
'error': None
})
print(f"Retrying task {task['id']}")
# Clean up stale tasks
def cleanup_stale_tasks(dao, days_old=30):
from datetime import datetime, timedelta
cutoff_date = datetime.now() - timedelta(days=days_old)
stale_tasks = dao.select_queued(status="stale")
for task in stale_tasks:
if task['created_at'] < cutoff_date:
dao.delete(task['id'])
print(f"Deleted stale task {task['id']}")
Code Organization¶
python/lsst/consdb/transformed_efd/
├── __init__.py # Module documentation and exports
├── config/ # Instrument configuration YAML files
├── config_model.py # Pydantic validation models
├── dao/ # Data access objects
│ ├── __init__.py
│ ├── base.py # DBBase foundation class
│ ├── butler.py # Butler metadata access
│ ├── influxdb.py # EFD data querying
│ ├── exposure_efd.py # Output table operations
│ ├── visit_efd.py
│ └── transformd.py # Task scheduling
├── generate_schema_from_config.py # Schema generation
├── queue_manager.py # Task scheduling
├── schemas/ # Generated database schemas
├── summary.py # Statistical processing
├── transform.py # Main transformation logic
└── transform_efd.py # CLI entry point