Introducing SQL Operators: The Most Powerful Way to Transform Your Observability Data
Imagine if your log data was like clay—malleable, shapeable, ready to become exactly what you need it to be. What if you could take raw logs flowing through your systems and sculpt them in real-time into custom metrics, business-specific traces, security alerts, and compliance reports? What if you could do all of this using familiar SQL, without complex configurations or specialized tooling?
Today, we're excited to announce our most powerful and differentiating feature: SQL Operators. This isn't just another log pipeline—it's a paradigm shift that puts the full power of Apache Flink SQL at your fingertips, allowing you to transform streaming log data into exactly the observability insights your business needs.
Traditional observability tools force you to work within their predefined structures and limitations. APM tools show you technical traces but miss business contexts. SIEM tools catch individual security events but struggle to connect attack patterns. Metrics platforms require extensive instrumentation and can't adapt to changing business logic.
SQL Operators change everything. Your logs become raw material that you can reshape, aggregate, correlate, and enrich in real-time using the SQL you already know.
The Power of Real-Time SQL Processing
Our SQL Transform operator brings the full power of Apache Flink SQL to your log streams, enabling you to process, aggregate, and transform log data in real-time using familiar SQL syntax. This isn't just about filtering logs—it's about creating entirely new observability patterns.
Real-Time Aggregations and Log-Based Metrics
Traditional metrics collection often requires instrumenting your code and waiting for data to flow through complex pipelines. With SQL operators, you can create metrics directly from your existing logs in real-time:
-- Create real-time error rate metrics per service
SELECT
window_start as eventtimestamp,
CONCAT('Service ', service, ' error rate: ',
CAST(ROUND(error_rate * 100, 2) AS STRING), '%') as message,
CASE WHEN error_rate > 0.05 THEN 17 ELSE 9 END as severity,
MAP['service', ARRAY[service], 'metric_type', ARRAY['error_rate']] as tags,
JSON_OBJECT('error_rate' VALUE error_rate, 'total_requests' VALUE total_count) as attributes
FROM (
SELECT
window_start,
service,
CAST(SUM(CASE WHEN severity >= 17 THEN 1 ELSE 0 END) AS DOUBLE) / COUNT(*) as error_rate,
COUNT(*) as total_count
FROM TABLE(TUMBLE(TABLE logs, DESCRIPTOR(TO_TIMESTAMP_LTZ(eventtimestamp, 3)), INTERVAL '1' MINUTE))
GROUP BY window_start, window_end, tags['service'][1]
)
This creates new log events that represent your error rate metrics, complete with alerting logic built right into the severity levels.
Flexible Trace Creation: The Gaming Company Example
Here's where SQL operators really shine. Imagine you're running a gaming company with multiplayer servers. Traditional APM would show you individual API calls, but wouldn't help you understand the complete player experience across a multiplayer session.
Consider this scenario: Players join a multiplayer game server, the server accepts requests from multiple players, and a game session begins. You want to trace the entire session lifecycle—from initial player connections through game completion—regardless of which specific services handle each request.
With SQL operators, you can create custom traces by joining all related events:
-- Create session-based traces for multiplayer gaming
SELECT
session_id as id,
window_end as eventtimestamp,
CONCAT('Multiplayer session ', session_id, ' completed: ',
CAST(player_count AS STRING), ' players, ',
CAST(session_duration_minutes AS STRING), ' minutes') as message,
CASE
WHEN session_duration_minutes < 2 THEN 13 -- WARN: Very short session
WHEN error_count > 0 THEN 17 -- ERROR: Session had issues
ELSE 9 -- INFO: Normal session
END as severity,
MAP['session_id', ARRAY[session_id], 'trace_type', ARRAY['multiplayer_session']] as tags,
JSON_OBJECT(
'session_duration_minutes' VALUE session_duration_minutes,
'player_count' VALUE player_count,
'total_events' VALUE total_events,
'error_count' VALUE error_count,
'game_rounds' VALUE game_rounds,
'session_outcome' VALUE session_outcome
) as attributes
FROM (
SELECT
window_start,
window_end,
JSON_VALUE(attributes, '$.session_id') as session_id,
COUNT(DISTINCT JSON_VALUE(attributes, '$.player_id')) as player_count,
COUNT(*) as total_events,
SUM(CASE WHEN severity >= 17 THEN 1 ELSE 0 END) as error_count,
COUNT(DISTINCT JSON_VALUE(attributes, '$.round_id')) as game_rounds,
(CAST(window_end AS BIGINT) - CAST(window_start AS BIGINT)) / 60000 as session_duration_minutes,
CASE
WHEN SUM(CASE WHEN message LIKE '%game_complete%' THEN 1 ELSE 0 END) > 0 THEN 'completed'
WHEN SUM(CASE WHEN severity >= 17 THEN 1 ELSE 0 END) > 0 THEN 'error'
ELSE 'abandoned'
END as session_outcome
FROM TABLE(
SESSION(
TABLE logs PARTITION BY JSON_VALUE(attributes, '$.session_id'),
DESCRIPTOR(TO_TIMESTAMP_LTZ(eventtimestamp, 3)),
INTERVAL '5' MINUTES -- End session if no activity for 5 minutes
)
)
WHERE JSON_EXISTS(attributes, '$.session_id')
GROUP BY window_start, window_end, JSON_VALUE(attributes, '$.session_id')
)
This creates a complete trace for each multiplayer session, tracking everything from player connections to game completion, regardless of which microservices actually handled the individual requests.
Dynamic Join Conditions: Server-Level Analysis
The real power comes from the ability to dynamically change your analysis perspective. Want to switch from session-level traces to server-level metrics? Just change your grouping:
-- Switch perspective: analyze by game server instead of session
SELECT
server_id,
window_start as eventtimestamp,
CONCAT('Game server ', server_id, ' handled ',
CAST(session_count AS STRING), ' sessions with ',
CAST(avg_players_per_session AS STRING), ' avg players') as message,
MAP['server_id', ARRAY[server_id], 'analysis_type', ARRAY['server_performance']] as tags,
JSON_OBJECT(
'session_count' VALUE session_count,
'total_players' VALUE total_players,
'avg_session_duration' VALUE avg_session_duration,
'server_utilization' VALUE server_utilization
) as attributes
FROM (
SELECT
window_start,
JSON_VALUE(attributes, '$.server_id') as server_id,
COUNT(DISTINCT JSON_VALUE(attributes, '$.session_id')) as session_count,
COUNT(DISTINCT JSON_VALUE(attributes, '$.player_id')) as total_players,
AVG(session_duration) as avg_session_duration,
ROUND(COUNT(*) / 1000.0, 2) as server_utilization -- Events per second as utilization proxy
FROM TABLE(TUMBLE(TABLE logs, DESCRIPTOR(TO_TIMESTAMP_LTZ(eventtimestamp, 3)), INTERVAL '10' MINUTES))
WHERE JSON_EXISTS(attributes, '$.server_id')
GROUP BY window_start, window_end, JSON_VALUE(attributes, '$.server_id')
)
Making Troubleshooting Faster
By creating these summarized log events, you're not just collecting data—you're pre-processing it for faster troubleshooting. Instead of searching through thousands of individual log lines to understand what happened during a problematic gaming session, you have a single, rich log event that tells the complete story.
These aggregated events can include:
- Session success/failure indicators
- Performance metrics (duration, player count, errors)
- Business context (game type, server performance, player behavior)
- Links back to the original detailed logs when needed
AI-Powered SQL Generation: Making Complex Queries Simple
While the SQL examples above might look complex, the reality is that these were generated by AI and it’s never been easier to do so. With comprehensive documentation and AI assistance, you can describe your observability needs in plain English and get production-ready SQL code.
For example, you could simply ask:
"I need to track multiplayer gaming sessions, grouping all events by session ID, and create summary logs that show session duration, player count, and whether the session completed successfully or had errors."
AI can then generate the appropriate SQL using the documented patterns, complete with:
- Proper windowing functions for real-time processing
- Correct JSON extraction from log attributes
- Appropriate severity levels and message formatting
- Optimized performance considerations
We’ll soon be integrating AI-powered pipeline building into our editor and with our upcoming live editing feature you will be able to see exactly what the outcome of any change is going to look like and iterate on it!
Advanced Security Use Case: Real-Time Threat Detection and Response
One of the most powerful applications of SQL operators is in cybersecurity, where traditional SIEM tools often struggle with complex, multi-stage attacks that unfold across different systems and timeframes. Let's explore how SQL operators can detect and trace advanced persistent threats (APTs) in real-time.
The Advanced Threat Scenario: Lateral Movement Detection
Imagine you're defending against a sophisticated attacker who has gained initial access to your network and is now attempting lateral movement. Traditional security tools might catch individual suspicious events, but they struggle to connect the dots across the entire attack chain.
The attack pattern looks like this:
- Initial compromise via phishing email
- Credential dumping on the compromised host
- Authentication attempts against multiple internal services
- Privilege escalation on a domain controller
- Data exfiltration from file servers
With SQL operators, you can create real-time threat hunting queries that automatically correlate these events and generate high-fidelity alerts:
-- Detect lateral movement patterns in real-time
SELECT
attack_chain_id as id,
window_end as eventtimestamp,
CONCAT('SECURITY ALERT: Lateral movement detected for user ', username,
' across ', CAST(affected_hosts AS STRING), ' hosts with ',
CAST(privilege_escalations AS STRING), ' privilege escalations') as message,
21 as severity, -- CRITICAL severity for confirmed attack patterns
MAP[
'username', ARRAY[username],
'alert_type', ARRAY['lateral_movement'],
'threat_level', ARRAY['critical'],
'attack_stage', ARRAY['active_breach']
] as tags,
JSON_OBJECT(
'attack_timeline_minutes' VALUE attack_duration_minutes,
'affected_hosts' VALUE affected_hosts,
'authentication_attempts' VALUE auth_attempts,
'privilege_escalations' VALUE privilege_escalations,
'data_access_events' VALUE data_events,
'threat_score' VALUE threat_score,
'recommended_action' VALUE 'immediate_isolation',
'attack_vector' VALUE initial_vector
) as attributes
FROM (
SELECT
window_start,
window_end,
username,
MD5(CONCAT(username, CAST(window_start AS STRING))) as attack_chain_id,
COUNT(DISTINCT JSON_VALUE(attributes, '$.source_host')) as affected_hosts,
COUNT(*) FILTER (WHERE message LIKE '%authentication%') as auth_attempts,
COUNT(*) FILTER (WHERE message LIKE '%privilege%' OR severity >= 17) as privilege_escalations,
COUNT(*) FILTER (WHERE message LIKE '%file_access%' OR message LIKE '%data%') as data_events,
(CAST(window_end AS BIGINT) - CAST(window_start AS BIGINT)) / 60000 as attack_duration_minutes,
-- Calculate threat score based on multiple factors
(COUNT(DISTINCT JSON_VALUE(attributes, '$.source_host')) * 10) +
(COUNT(*) FILTER (WHERE severity >= 17) * 5) +
(COUNT(*) FILTER (WHERE message LIKE '%admin%' OR message LIKE '%root%') * 15) as threat_score,
FIRST_VALUE(JSON_VALUE(attributes, '$.attack_vector')) OVER (
PARTITION BY username ORDER BY eventtimestamp
) as initial_vector
FROM TABLE(
SESSION(
TABLE logs PARTITION BY JSON_VALUE(attributes, '$.username'),
DESCRIPTOR(TO_TIMESTAMP_LTZ(eventtimestamp, 3)),
INTERVAL '30' MINUTES -- Group events within 30-minute windows per user
)
)
WHERE
-- Focus on authentication and privilege-related events
(message LIKE '%authentication%' OR message LIKE '%login%' OR
message LIKE '%privilege%' OR message LIKE '%sudo%' OR
message LIKE '%admin%' OR severity >= 17)
AND JSON_EXISTS(attributes, '$.username')
GROUP BY window_start, window_end, JSON_VALUE(attributes, '$.username')
-- Only trigger on suspicious patterns
HAVING
COUNT(DISTINCT JSON_VALUE(attributes, '$.source_host')) >= 3 -- Multiple hosts
AND COUNT(*) FILTER (WHERE severity >= 17) >= 2 -- Multiple high-severity events
AND COUNT(*) >= 5 -- Minimum activity threshold
)
Automated Threat Intelligence Enrichment
SQL operators can also enrich security events with threat intelligence in real-time, creating contextual alerts that help security teams prioritize their response:
-- Enrich security events with threat intelligence and risk scoring
SELECT
*,
CASE
WHEN JSON_VALUE(attributes, '$.source_ip') IN (
'192.168.1.100', '10.0.0.50' -- Known compromised IPs
) THEN 'known_bad_actor'
WHEN JSON_VALUE(attributes, '$.source_country') IN ('CN', 'RU', 'KP')
AND EXTRACT(HOUR FROM TO_TIMESTAMP_LTZ(eventtimestamp, 3)) BETWEEN 2 AND 6
THEN 'suspicious_geography_time'
WHEN message LIKE '%brute_force%' OR message LIKE '%dictionary_attack%'
THEN 'automated_attack'
ELSE 'standard_event'
END as threat_classification,
-- Risk score calculation
CASE
WHEN severity >= 20 THEN 100 -- Critical events
WHEN JSON_VALUE(attributes, '$.failed_attempts') IS NOT NULL
AND CAST(JSON_VALUE(attributes, '$.failed_attempts') AS INT) > 10 THEN 85
WHEN message LIKE '%admin%' AND severity >= 17 THEN 75
WHEN JSON_VALUE(attributes, '$.after_hours') = 'true' THEN 60
ELSE 25
END as risk_score,
-- Recommended response actions
CASE
WHEN JSON_VALUE(attributes, '$.source_ip') IN ('192.168.1.100', '10.0.0.50')
THEN 'immediate_block_and_investigate'
WHEN risk_score >= 75
THEN 'escalate_to_soc_analyst'
WHEN risk_score >= 60
THEN 'monitor_and_correlate'
ELSE 'log_for_baseline'
END as recommended_action
FROM logs
WHERE
tags['service'][1] IN ('auth', 'vpn', 'ssh', 'rdp', 'admin')
AND (severity >= 13 OR message LIKE '%security%' OR message LIKE '%auth%')
Compliance and Forensic Readiness
The SQL operator can also create compliance-ready audit logs and forensic timelines automatically:
-- Create forensic timeline for security incidents
SELECT
CONCAT('AUDIT: User ', username, ' action: ', action_type,
' on resource: ', resource, ' result: ', result) as message,
9 as severity, -- INFO level for audit trails
MAP[
'audit_type', ARRAY['user_activity'],
'compliance', ARRAY['sox', 'gdpr', 'hipaa'],
'forensic_ready', ARRAY['true']
] as tags,
JSON_OBJECT(
'username' VALUE username,
'action_type' VALUE action_type,
'resource_accessed' VALUE resource,
'action_result' VALUE result,
'source_ip' VALUE source_ip,
'user_agent' VALUE user_agent,
'session_id' VALUE session_id,
'privilege_level' VALUE privilege_level,
'data_classification' VALUE data_classification,
'retention_period_days' VALUE 2555 -- 7 years for compliance
) as attributes
FROM (
SELECT
JSON_VALUE(attributes, '$.username') as username,
CASE
WHEN message LIKE '%create%' THEN 'create'
WHEN message LIKE '%read%' OR message LIKE '%view%' THEN 'read'
WHEN message LIKE '%update%' OR message LIKE '%modify%' THEN 'update'
WHEN message LIKE '%delete%' THEN 'delete'
ELSE 'unknown'
END as action_type,
JSON_VALUE(attributes, '$.resource') as resource,
CASE WHEN severity < 13 THEN 'success' ELSE 'failure' END as result,
JSON_VALUE(attributes, '$.source_ip') as source_ip,
JSON_VALUE(attributes, '$.user_agent') as user_agent,
JSON_VALUE(attributes, '$.session_id') as session_id,
JSON_VALUE(attributes, '$.privilege_level') as privilege_level,
CASE
WHEN JSON_VALUE(attributes, '$.resource') LIKE '%pii%' THEN 'sensitive'
WHEN JSON_VALUE(attributes, '$.resource') LIKE '%financial%' THEN 'restricted'
ELSE 'internal'
END as data_classification
FROM logs
WHERE JSON_EXISTS(attributes, '$.username')
AND JSON_EXISTS(attributes, '$.resource')
)
The Security Advantage
This approach provides several key advantages over traditional security tools:
- Real-time correlation: Events are connected as they happen, not hours later during batch processing
- Business context: Security events include business-relevant information like data classification and user roles
- Automated enrichment: Threat intelligence and risk scoring happen automatically
- Compliance-ready: Audit trails are generated in the format required for regulatory compliance
- Custom detection logic: Security teams can encode their specific threat models and detection rules
With AI assistance, security teams can describe complex attack patterns in plain English and get sophisticated detection queries that would typically require deep expertise in both cybersecurity and stream processing.
The Future of Observability
SQL operators represent a shift from passive log collection to active data processing. Instead of just storing logs and hoping to find patterns later, you're creating the exact observability data you need in real-time, using the business logic that matters to your application.
Whether you're tracking multiplayer gaming sessions, e-commerce checkout flows, or complex financial transactions, SQL operators let you define traces and metrics that follow your users' actual journey—not just your technical architecture.
Combined with AI assistance for query generation, this approach makes advanced observability accessible to teams of all sizes and skill levels. You can focus on understanding your business requirements while AI handles the technical complexity of stream processing.
The result? Faster incident resolution, better user experience insights, and observability that actually matches how your business operates—all achievable without deep expertise in stream processing frameworks.
Getting Started
Ready to transform your observability approach? Start by:
- Identifying a business flow that spans multiple services
- Describing your ideal trace or metric in plain English
- Using AI to generate the SQL transform based on your log structure
- Iterating and refining based on your specific needs
The future of observability is here, and it speaks SQL—with AI as your guide. Try Grepr free today! Sign up at app.grepr.ai/signup or contact us on our website at grepr.ai.
More blog posts
All blog posts
Using Grepr With Datadog
.jpg)
Use Grepr to Avoid Observability Vendor Lock-In
