Announcing the SQL Operator

Jad Naous
July 14, 2025

Introducing SQL Operators: The Most Powerful Way to Transform Your Observability Data

Imagine if your log data was like clay—malleable, shapeable, ready to become exactly what you need it to be. What if you could take raw logs flowing through your systems and sculpt them in real-time into custom metrics, business-specific traces, security alerts, and compliance reports? What if you could do all of this using familiar SQL, without complex configurations or specialized tooling?

Today, we're excited to announce our most powerful and differentiating feature: SQL Operators. This isn't just another log pipeline—it's a paradigm shift that puts the full power of Apache Flink SQL at your fingertips, allowing you to transform streaming log data into exactly the observability insights your business needs.

Traditional observability tools force you to work within their predefined structures and limitations. APM tools show you technical traces but miss business contexts. SIEM tools catch individual security events but struggle to connect attack patterns. Metrics platforms require extensive instrumentation and can't adapt to changing business logic.

SQL Operators change everything. Your logs become raw material that you can reshape, aggregate, correlate, and enrich in real-time using the SQL you already know.

The Power of Real-Time SQL Processing

Our SQL Transform operator brings the full power of Apache Flink SQL to your log streams, enabling you to process, aggregate, and transform log data in real-time using familiar SQL syntax. This isn't just about filtering logs—it's about creating entirely new observability patterns.

Real-Time Aggregations and Log-Based Metrics

Traditional metrics collection often requires instrumenting your code and waiting for data to flow through complex pipelines. With SQL operators, you can create metrics directly from your existing logs in real-time:

-- Create real-time error rate metrics per service
SELECT
  window_start as eventtimestamp,
  CONCAT('Service ', service, ' error rate: ', 
         CAST(ROUND(error_rate * 100, 2) AS STRING), '%') as message,
  CASE WHEN error_rate > 0.05 THEN 17 ELSE 9 END as severity,
  MAP['service', ARRAY[service], 'metric_type', ARRAY['error_rate']] as tags,
  JSON_OBJECT('error_rate' VALUE error_rate, 'total_requests' VALUE total_count) as attributes
FROM (
  SELECT
    window_start,
    service,
    CAST(SUM(CASE WHEN severity >= 17 THEN 1 ELSE 0 END) AS DOUBLE) / COUNT(*) as error_rate,
    COUNT(*) as total_count
  FROM TABLE(TUMBLE(TABLE logs, DESCRIPTOR(TO_TIMESTAMP_LTZ(eventtimestamp, 3)), INTERVAL '1' MINUTE))
  GROUP BY window_start, window_end, tags['service'][1]
)

This creates new log events that represent your error rate metrics, complete with alerting logic built right into the severity levels.

Flexible Trace Creation: The Gaming Company Example

Here's where SQL operators really shine. Imagine you're running a gaming company with multiplayer servers. Traditional APM would show you individual API calls, but wouldn't help you understand the complete player experience across a multiplayer session.

Consider this scenario: Players join a multiplayer game server, the server accepts requests from multiple players, and a game session begins. You want to trace the entire session lifecycle—from initial player connections through game completion—regardless of which specific services handle each request.

With SQL operators, you can create custom traces by joining all related events:

-- Create session-based traces for multiplayer gaming
SELECT
  session_id as id,
  window_end as eventtimestamp,
  CONCAT('Multiplayer session ', session_id, ' completed: ', 
         CAST(player_count AS STRING), ' players, ', 
         CAST(session_duration_minutes AS STRING), ' minutes') as message,
  CASE 
    WHEN session_duration_minutes < 2 THEN 13  -- WARN: Very short session
    WHEN error_count > 0 THEN 17               -- ERROR: Session had issues
    ELSE 9                                     -- INFO: Normal session
  END as severity,
  MAP['session_id', ARRAY[session_id], 'trace_type', ARRAY['multiplayer_session']] as tags,
  JSON_OBJECT(
    'session_duration_minutes' VALUE session_duration_minutes,
    'player_count' VALUE player_count,
    'total_events' VALUE total_events,
    'error_count' VALUE error_count,
    'game_rounds' VALUE game_rounds,
    'session_outcome' VALUE session_outcome
  ) as attributes
FROM (
  SELECT
    window_start,
    window_end,
    JSON_VALUE(attributes, '$.session_id') as session_id,
    COUNT(DISTINCT JSON_VALUE(attributes, '$.player_id')) as player_count,
    COUNT(*) as total_events,
    SUM(CASE WHEN severity >= 17 THEN 1 ELSE 0 END) as error_count,
    COUNT(DISTINCT JSON_VALUE(attributes, '$.round_id')) as game_rounds,
    (CAST(window_end AS BIGINT) - CAST(window_start AS BIGINT)) / 60000 as session_duration_minutes,
    CASE 
      WHEN SUM(CASE WHEN message LIKE '%game_complete%' THEN 1 ELSE 0 END) > 0 THEN 'completed'
      WHEN SUM(CASE WHEN severity >= 17 THEN 1 ELSE 0 END) > 0 THEN 'error'
      ELSE 'abandoned'
    END as session_outcome
  FROM TABLE(
    SESSION(
      TABLE logs PARTITION BY JSON_VALUE(attributes, '$.session_id'),
      DESCRIPTOR(TO_TIMESTAMP_LTZ(eventtimestamp, 3)),
      INTERVAL '5' MINUTES  -- End session if no activity for 5 minutes
    )
  )
  WHERE JSON_EXISTS(attributes, '$.session_id')
  GROUP BY window_start, window_end, JSON_VALUE(attributes, '$.session_id')
)

This creates a complete trace for each multiplayer session, tracking everything from player connections to game completion, regardless of which microservices actually handled the individual requests.

Dynamic Join Conditions: Server-Level Analysis

The real power comes from the ability to dynamically change your analysis perspective. Want to switch from session-level traces to server-level metrics? Just change your grouping:

-- Switch perspective: analyze by game server instead of session
SELECT
  server_id,
  window_start as eventtimestamp,
  CONCAT('Game server ', server_id, ' handled ', 
         CAST(session_count AS STRING), ' sessions with ', 
         CAST(avg_players_per_session AS STRING), ' avg players') as message,
  MAP['server_id', ARRAY[server_id], 'analysis_type', ARRAY['server_performance']] as tags,
  JSON_OBJECT(
    'session_count' VALUE session_count,
    'total_players' VALUE total_players,
    'avg_session_duration' VALUE avg_session_duration,
    'server_utilization' VALUE server_utilization
  ) as attributes
FROM (
  SELECT
    window_start,
    JSON_VALUE(attributes, '$.server_id') as server_id,
    COUNT(DISTINCT JSON_VALUE(attributes, '$.session_id')) as session_count,
    COUNT(DISTINCT JSON_VALUE(attributes, '$.player_id')) as total_players,
    AVG(session_duration) as avg_session_duration,
    ROUND(COUNT(*) / 1000.0, 2) as server_utilization  -- Events per second as utilization proxy
  FROM TABLE(TUMBLE(TABLE logs, DESCRIPTOR(TO_TIMESTAMP_LTZ(eventtimestamp, 3)), INTERVAL '10' MINUTES))
  WHERE JSON_EXISTS(attributes, '$.server_id')
  GROUP BY window_start, window_end, JSON_VALUE(attributes, '$.server_id')
)
Making Troubleshooting Faster

By creating these summarized log events, you're not just collecting data—you're pre-processing it for faster troubleshooting. Instead of searching through thousands of individual log lines to understand what happened during a problematic gaming session, you have a single, rich log event that tells the complete story.

These aggregated events can include:

  • Session success/failure indicators
  • Performance metrics (duration, player count, errors)
  • Business context (game type, server performance, player behavior)
  • Links back to the original detailed logs when needed
AI-Powered SQL Generation: Making Complex Queries Simple

While the SQL examples above might look complex, the reality is that these were generated by AI and it’s never been easier to do so. With comprehensive documentation and AI assistance, you can describe your observability needs in plain English and get production-ready SQL code.

For example, you could simply ask:

"I need to track multiplayer gaming sessions, grouping all events by session ID, and create summary logs that show session duration, player count, and whether the session completed successfully or had errors."

AI can then generate the appropriate SQL using the documented patterns, complete with:

  • Proper windowing functions for real-time processing
  • Correct JSON extraction from log attributes
  • Appropriate severity levels and message formatting
  • Optimized performance considerations

We’ll soon be integrating AI-powered pipeline building into our editor and with our upcoming live editing feature you will be able to see exactly what the outcome of any change is going to look like and iterate on it!

Advanced Security Use Case: Real-Time Threat Detection and Response

One of the most powerful applications of SQL operators is in cybersecurity, where traditional SIEM tools often struggle with complex, multi-stage attacks that unfold across different systems and timeframes. Let's explore how SQL operators can detect and trace advanced persistent threats (APTs) in real-time.

The Advanced Threat Scenario: Lateral Movement Detection

Imagine you're defending against a sophisticated attacker who has gained initial access to your network and is now attempting lateral movement. Traditional security tools might catch individual suspicious events, but they struggle to connect the dots across the entire attack chain.

The attack pattern looks like this:

  1. Initial compromise via phishing email
  2. Credential dumping on the compromised host
  3. Authentication attempts against multiple internal services
  4. Privilege escalation on a domain controller
  5. Data exfiltration from file servers

With SQL operators, you can create real-time threat hunting queries that automatically correlate these events and generate high-fidelity alerts:

-- Detect lateral movement patterns in real-time
SELECT
  attack_chain_id as id,
  window_end as eventtimestamp,
  CONCAT('SECURITY ALERT: Lateral movement detected for user ', username, 
         ' across ', CAST(affected_hosts AS STRING), ' hosts with ', 
         CAST(privilege_escalations AS STRING), ' privilege escalations') as message,
  21 as severity,  -- CRITICAL severity for confirmed attack patterns
  MAP[
    'username', ARRAY[username],
    'alert_type', ARRAY['lateral_movement'],
    'threat_level', ARRAY['critical'],
    'attack_stage', ARRAY['active_breach']
  ] as tags,
  JSON_OBJECT(
    'attack_timeline_minutes' VALUE attack_duration_minutes,
    'affected_hosts' VALUE affected_hosts,
    'authentication_attempts' VALUE auth_attempts,
    'privilege_escalations' VALUE privilege_escalations,
    'data_access_events' VALUE data_events,
    'threat_score' VALUE threat_score,
    'recommended_action' VALUE 'immediate_isolation',
    'attack_vector' VALUE initial_vector
  ) as attributes
FROM (
  SELECT
    window_start,
    window_end,
    username,
    MD5(CONCAT(username, CAST(window_start AS STRING))) as attack_chain_id,
    COUNT(DISTINCT JSON_VALUE(attributes, '$.source_host')) as affected_hosts,
    COUNT(*) FILTER (WHERE message LIKE '%authentication%') as auth_attempts,
    COUNT(*) FILTER (WHERE message LIKE '%privilege%' OR severity >= 17) as privilege_escalations,
    COUNT(*) FILTER (WHERE message LIKE '%file_access%' OR message LIKE '%data%') as data_events,
    (CAST(window_end AS BIGINT) - CAST(window_start AS BIGINT)) / 60000 as attack_duration_minutes,
    -- Calculate threat score based on multiple factors
    (COUNT(DISTINCT JSON_VALUE(attributes, '$.source_host')) * 10) + 
    (COUNT(*) FILTER (WHERE severity >= 17) * 5) + 
    (COUNT(*) FILTER (WHERE message LIKE '%admin%' OR message LIKE '%root%') * 15) as threat_score,
    FIRST_VALUE(JSON_VALUE(attributes, '$.attack_vector')) OVER (
      PARTITION BY username ORDER BY eventtimestamp
    ) as initial_vector
  FROM TABLE(
    SESSION(
      TABLE logs PARTITION BY JSON_VALUE(attributes, '$.username'),
      DESCRIPTOR(TO_TIMESTAMP_LTZ(eventtimestamp, 3)),
      INTERVAL '30' MINUTES  -- Group events within 30-minute windows per user
    )
  )
  WHERE 
    -- Focus on authentication and privilege-related events
    (message LIKE '%authentication%' OR message LIKE '%login%' OR 
     message LIKE '%privilege%' OR message LIKE '%sudo%' OR 
     message LIKE '%admin%' OR severity >= 17)
    AND JSON_EXISTS(attributes, '$.username')
  GROUP BY window_start, window_end, JSON_VALUE(attributes, '$.username')
  -- Only trigger on suspicious patterns
  HAVING 
    COUNT(DISTINCT JSON_VALUE(attributes, '$.source_host')) >= 3  -- Multiple hosts
    AND COUNT(*) FILTER (WHERE severity >= 17) >= 2  -- Multiple high-severity events
    AND COUNT(*) >= 5  -- Minimum activity threshold
)
Automated Threat Intelligence Enrichment

SQL operators can also enrich security events with threat intelligence in real-time, creating contextual alerts that help security teams prioritize their response:

-- Enrich security events with threat intelligence and risk scoring
SELECT
  *,
  CASE 
    WHEN JSON_VALUE(attributes, '$.source_ip') IN (
      '192.168.1.100', '10.0.0.50'  -- Known compromised IPs
    ) THEN 'known_bad_actor'
    WHEN JSON_VALUE(attributes, '$.source_country') IN ('CN', 'RU', 'KP') 
         AND EXTRACT(HOUR FROM TO_TIMESTAMP_LTZ(eventtimestamp, 3)) BETWEEN 2 AND 6 
    THEN 'suspicious_geography_time'
    WHEN message LIKE '%brute_force%' OR message LIKE '%dictionary_attack%'
    THEN 'automated_attack'
    ELSE 'standard_event'
  END as threat_classification,
  
  -- Risk score calculation
  CASE
    WHEN severity >= 20 THEN 100  -- Critical events
    WHEN JSON_VALUE(attributes, '$.failed_attempts') IS NOT NULL 
         AND CAST(JSON_VALUE(attributes, '$.failed_attempts') AS INT) > 10 THEN 85
    WHEN message LIKE '%admin%' AND severity >= 17 THEN 75
    WHEN JSON_VALUE(attributes, '$.after_hours') = 'true' THEN 60
    ELSE 25
  END as risk_score,
  
  -- Recommended response actions
  CASE
    WHEN JSON_VALUE(attributes, '$.source_ip') IN ('192.168.1.100', '10.0.0.50')
    THEN 'immediate_block_and_investigate'
    WHEN risk_score >= 75
    THEN 'escalate_to_soc_analyst'
    WHEN risk_score >= 60
    THEN 'monitor_and_correlate'
    ELSE 'log_for_baseline'
  END as recommended_action

FROM logs
WHERE 
  tags['service'][1] IN ('auth', 'vpn', 'ssh', 'rdp', 'admin')
  AND (severity >= 13 OR message LIKE '%security%' OR message LIKE '%auth%')
Compliance and Forensic Readiness

The SQL operator can also create compliance-ready audit logs and forensic timelines automatically:

-- Create forensic timeline for security incidents
SELECT
  CONCAT('AUDIT: User ', username, ' action: ', action_type, 
         ' on resource: ', resource, ' result: ', result) as message,
  9 as severity,  -- INFO level for audit trails
  MAP[
    'audit_type', ARRAY['user_activity'],
    'compliance', ARRAY['sox', 'gdpr', 'hipaa'],
    'forensic_ready', ARRAY['true']
  ] as tags,
  JSON_OBJECT(
    'username' VALUE username,
    'action_type' VALUE action_type,
    'resource_accessed' VALUE resource,
    'action_result' VALUE result,
    'source_ip' VALUE source_ip,
    'user_agent' VALUE user_agent,
    'session_id' VALUE session_id,
    'privilege_level' VALUE privilege_level,
    'data_classification' VALUE data_classification,
    'retention_period_days' VALUE 2555  -- 7 years for compliance
  ) as attributes
FROM (
  SELECT
    JSON_VALUE(attributes, '$.username') as username,
    CASE 
      WHEN message LIKE '%create%' THEN 'create'
      WHEN message LIKE '%read%' OR message LIKE '%view%' THEN 'read'
      WHEN message LIKE '%update%' OR message LIKE '%modify%' THEN 'update'
      WHEN message LIKE '%delete%' THEN 'delete'
      ELSE 'unknown'
    END as action_type,
    JSON_VALUE(attributes, '$.resource') as resource,
    CASE WHEN severity < 13 THEN 'success' ELSE 'failure' END as result,
    JSON_VALUE(attributes, '$.source_ip') as source_ip,
    JSON_VALUE(attributes, '$.user_agent') as user_agent,
    JSON_VALUE(attributes, '$.session_id') as session_id,
    JSON_VALUE(attributes, '$.privilege_level') as privilege_level,
    CASE 
      WHEN JSON_VALUE(attributes, '$.resource') LIKE '%pii%' THEN 'sensitive'
      WHEN JSON_VALUE(attributes, '$.resource') LIKE '%financial%' THEN 'restricted'
      ELSE 'internal'
    END as data_classification
  FROM logs
  WHERE JSON_EXISTS(attributes, '$.username') 
    AND JSON_EXISTS(attributes, '$.resource')
)
The Security Advantage

This approach provides several key advantages over traditional security tools:

  1. Real-time correlation: Events are connected as they happen, not hours later during batch processing
  2. Business context: Security events include business-relevant information like data classification and user roles
  3. Automated enrichment: Threat intelligence and risk scoring happen automatically
  4. Compliance-ready: Audit trails are generated in the format required for regulatory compliance
  5. Custom detection logic: Security teams can encode their specific threat models and detection rules

With AI assistance, security teams can describe complex attack patterns in plain English and get sophisticated detection queries that would typically require deep expertise in both cybersecurity and stream processing.

The Future of Observability

SQL operators represent a shift from passive log collection to active data processing. Instead of just storing logs and hoping to find patterns later, you're creating the exact observability data you need in real-time, using the business logic that matters to your application.

Whether you're tracking multiplayer gaming sessions, e-commerce checkout flows, or complex financial transactions, SQL operators let you define traces and metrics that follow your users' actual journey—not just your technical architecture.

Combined with AI assistance for query generation, this approach makes advanced observability accessible to teams of all sizes and skill levels. You can focus on understanding your business requirements while AI handles the technical complexity of stream processing.

The result? Faster incident resolution, better user experience insights, and observability that actually matches how your business operates—all achievable without deep expertise in stream processing frameworks.

Getting Started

Ready to transform your observability approach? Start by:

  1. Identifying a business flow that spans multiple services
  2. Describing your ideal trace or metric in plain English
  3. Using AI to generate the SQL transform based on your log structure
  4. Iterating and refining based on your specific needs

The future of observability is here, and it speaks SQL—with AI as your guide. Try Grepr free today! Sign up at app.grepr.ai/signup or contact us on our website at grepr.ai.

Share this post

More blog posts

All blog posts
Product

Use Grepr With Splunk

This blog post provides a comprehensive, step-by-step guide on how to seamlessly integrate the Grepr Intelligent Observability Data Engine with Splunk. It explains that with a few simple configuration changes, you can reroute your logs to Grepr, which uses machine learning to automatically detect and summarize frequent log patterns. This process can reduce your Splunk log volume and associated cloud costs by up to 90%, all without discarding any data. The post walks you through the entire setup, from configuring integrations for Splunk S2S or HEC to creating pipelines and datasets, ultimately demonstrating how to achieve significant cost savings while maintaining full diagnostic visibility.
August 29, 2025
Product

Structured Logging - What It Is and Why You Need It

In modern, complex software environments, unstructured logs can create chaos and make it difficult to gain insights. This blog post explains why structured logging, which captures log data in a consistent, machine-readable format like JSON, is an essential practice. By standardizing your logs, you can dramatically improve observability, ensure consistency across teams, and future-proof your systems. The post details how this approach facilitates faster troubleshooting, enables powerful automation, and turns your log data into a valuable source for metrics and analytics, ultimately transforming logs from simple text files into a critical source of truth for your applications.
August 25, 2025
Product

Control Observability Costs Without Dropping Data

Many IT teams face a difficult trade-off: managing the high costs of observability data while still maintaining full visibility into increasingly complex systems. This blog post introduces a solution to this problem, explaining how to achieve 100% visibility with just 10% of the data. It breaks down observability data into two tiers—essential "heartbeat" data and voluminous "diagnostic" data—and demonstrates how the Grepr Intelligent Observability Data Engine uses machine learning to summarize diagnostic logs, retaining all of the raw data in low-cost storage. This approach allows teams to dramatically reduce their ingestion costs, while still having the ability to backfill all of the relevant diagnostic data for troubleshooting incidents, ensuring no critical information is lost.
August 20, 2025

Get started free and see Grepr in action in 20 minutes.