Announcing the SQL Operator

Jad Naous
July 14, 2025

Introducing SQL Operators: The Most Powerful Way to Transform Your Observability Data

Imagine if your log data was like clay—malleable, shapeable, ready to become exactly what you need it to be. What if you could take raw logs flowing through your systems and sculpt them in real-time into custom metrics, business-specific traces, security alerts, and compliance reports? What if you could do all of this using familiar SQL, without complex configurations or specialized tooling?

Today, we're excited to announce our most powerful and differentiating feature: SQL Operators. This isn't just another log pipeline—it's a paradigm shift that puts the full power of Apache Flink SQL at your fingertips, allowing you to transform streaming log data into exactly the observability insights your business needs.

Traditional observability tools force you to work within their predefined structures and limitations. APM tools show you technical traces but miss business contexts. SIEM tools catch individual security events but struggle to connect attack patterns. Metrics platforms require extensive instrumentation and can't adapt to changing business logic.

SQL Operators change everything. Your logs become raw material that you can reshape, aggregate, correlate, and enrich in real-time using the SQL you already know.

The Power of Real-Time SQL Processing

Our SQL Transform operator brings the full power of Apache Flink SQL to your log streams, enabling you to process, aggregate, and transform log data in real-time using familiar SQL syntax. This isn't just about filtering logs—it's about creating entirely new observability patterns.

Real-Time Aggregations and Log-Based Metrics

Traditional metrics collection often requires instrumenting your code and waiting for data to flow through complex pipelines. With SQL operators, you can create metrics directly from your existing logs in real-time:

-- Create real-time error rate metrics per service
SELECT
  window_start as eventtimestamp,
  CONCAT('Service ', service, ' error rate: ', 
         CAST(ROUND(error_rate * 100, 2) AS STRING), '%') as message,
  CASE WHEN error_rate > 0.05 THEN 17 ELSE 9 END as severity,
  MAP['service', ARRAY[service], 'metric_type', ARRAY['error_rate']] as tags,
  JSON_OBJECT('error_rate' VALUE error_rate, 'total_requests' VALUE total_count) as attributes
FROM (
  SELECT
    window_start,
    service,
    CAST(SUM(CASE WHEN severity >= 17 THEN 1 ELSE 0 END) AS DOUBLE) / COUNT(*) as error_rate,
    COUNT(*) as total_count
  FROM TABLE(TUMBLE(TABLE logs, DESCRIPTOR(TO_TIMESTAMP_LTZ(eventtimestamp, 3)), INTERVAL '1' MINUTE))
  GROUP BY window_start, window_end, tags['service'][1]
)

This creates new log events that represent your error rate metrics, complete with alerting logic built right into the severity levels.

Flexible Trace Creation: The Gaming Company Example

Here's where SQL operators really shine. Imagine you're running a gaming company with multiplayer servers. Traditional APM would show you individual API calls, but wouldn't help you understand the complete player experience across a multiplayer session.

Consider this scenario: Players join a multiplayer game server, the server accepts requests from multiple players, and a game session begins. You want to trace the entire session lifecycle—from initial player connections through game completion—regardless of which specific services handle each request.

With SQL operators, you can create custom traces by joining all related events:

-- Create session-based traces for multiplayer gaming
SELECT
  session_id as id,
  window_end as eventtimestamp,
  CONCAT('Multiplayer session ', session_id, ' completed: ', 
         CAST(player_count AS STRING), ' players, ', 
         CAST(session_duration_minutes AS STRING), ' minutes') as message,
  CASE 
    WHEN session_duration_minutes < 2 THEN 13  -- WARN: Very short session
    WHEN error_count > 0 THEN 17               -- ERROR: Session had issues
    ELSE 9                                     -- INFO: Normal session
  END as severity,
  MAP['session_id', ARRAY[session_id], 'trace_type', ARRAY['multiplayer_session']] as tags,
  JSON_OBJECT(
    'session_duration_minutes' VALUE session_duration_minutes,
    'player_count' VALUE player_count,
    'total_events' VALUE total_events,
    'error_count' VALUE error_count,
    'game_rounds' VALUE game_rounds,
    'session_outcome' VALUE session_outcome
  ) as attributes
FROM (
  SELECT
    window_start,
    window_end,
    JSON_VALUE(attributes, '$.session_id') as session_id,
    COUNT(DISTINCT JSON_VALUE(attributes, '$.player_id')) as player_count,
    COUNT(*) as total_events,
    SUM(CASE WHEN severity >= 17 THEN 1 ELSE 0 END) as error_count,
    COUNT(DISTINCT JSON_VALUE(attributes, '$.round_id')) as game_rounds,
    (CAST(window_end AS BIGINT) - CAST(window_start AS BIGINT)) / 60000 as session_duration_minutes,
    CASE 
      WHEN SUM(CASE WHEN message LIKE '%game_complete%' THEN 1 ELSE 0 END) > 0 THEN 'completed'
      WHEN SUM(CASE WHEN severity >= 17 THEN 1 ELSE 0 END) > 0 THEN 'error'
      ELSE 'abandoned'
    END as session_outcome
  FROM TABLE(
    SESSION(
      TABLE logs PARTITION BY JSON_VALUE(attributes, '$.session_id'),
      DESCRIPTOR(TO_TIMESTAMP_LTZ(eventtimestamp, 3)),
      INTERVAL '5' MINUTES  -- End session if no activity for 5 minutes
    )
  )
  WHERE JSON_EXISTS(attributes, '$.session_id')
  GROUP BY window_start, window_end, JSON_VALUE(attributes, '$.session_id')
)

This creates a complete trace for each multiplayer session, tracking everything from player connections to game completion, regardless of which microservices actually handled the individual requests.

Dynamic Join Conditions: Server-Level Analysis

The real power comes from the ability to dynamically change your analysis perspective. Want to switch from session-level traces to server-level metrics? Just change your grouping:

-- Switch perspective: analyze by game server instead of session
SELECT
  server_id,
  window_start as eventtimestamp,
  CONCAT('Game server ', server_id, ' handled ', 
         CAST(session_count AS STRING), ' sessions with ', 
         CAST(avg_players_per_session AS STRING), ' avg players') as message,
  MAP['server_id', ARRAY[server_id], 'analysis_type', ARRAY['server_performance']] as tags,
  JSON_OBJECT(
    'session_count' VALUE session_count,
    'total_players' VALUE total_players,
    'avg_session_duration' VALUE avg_session_duration,
    'server_utilization' VALUE server_utilization
  ) as attributes
FROM (
  SELECT
    window_start,
    JSON_VALUE(attributes, '$.server_id') as server_id,
    COUNT(DISTINCT JSON_VALUE(attributes, '$.session_id')) as session_count,
    COUNT(DISTINCT JSON_VALUE(attributes, '$.player_id')) as total_players,
    AVG(session_duration) as avg_session_duration,
    ROUND(COUNT(*) / 1000.0, 2) as server_utilization  -- Events per second as utilization proxy
  FROM TABLE(TUMBLE(TABLE logs, DESCRIPTOR(TO_TIMESTAMP_LTZ(eventtimestamp, 3)), INTERVAL '10' MINUTES))
  WHERE JSON_EXISTS(attributes, '$.server_id')
  GROUP BY window_start, window_end, JSON_VALUE(attributes, '$.server_id')
)
Making Troubleshooting Faster

By creating these summarized log events, you're not just collecting data—you're pre-processing it for faster troubleshooting. Instead of searching through thousands of individual log lines to understand what happened during a problematic gaming session, you have a single, rich log event that tells the complete story.

These aggregated events can include:

  • Session success/failure indicators
  • Performance metrics (duration, player count, errors)
  • Business context (game type, server performance, player behavior)
  • Links back to the original detailed logs when needed
AI-Powered SQL Generation: Making Complex Queries Simple

While the SQL examples above might look complex, the reality is that these were generated by AI and it’s never been easier to do so. With comprehensive documentation and AI assistance, you can describe your observability needs in plain English and get production-ready SQL code.

For example, you could simply ask:

"I need to track multiplayer gaming sessions, grouping all events by session ID, and create summary logs that show session duration, player count, and whether the session completed successfully or had errors."

AI can then generate the appropriate SQL using the documented patterns, complete with:

  • Proper windowing functions for real-time processing
  • Correct JSON extraction from log attributes
  • Appropriate severity levels and message formatting
  • Optimized performance considerations

We’ll soon be integrating AI-powered pipeline building into our editor and with our upcoming live editing feature you will be able to see exactly what the outcome of any change is going to look like and iterate on it!

Advanced Security Use Case: Real-Time Threat Detection and Response

One of the most powerful applications of SQL operators is in cybersecurity, where traditional SIEM tools often struggle with complex, multi-stage attacks that unfold across different systems and timeframes. Let's explore how SQL operators can detect and trace advanced persistent threats (APTs) in real-time.

The Advanced Threat Scenario: Lateral Movement Detection

Imagine you're defending against a sophisticated attacker who has gained initial access to your network and is now attempting lateral movement. Traditional security tools might catch individual suspicious events, but they struggle to connect the dots across the entire attack chain.

The attack pattern looks like this:

  1. Initial compromise via phishing email
  2. Credential dumping on the compromised host
  3. Authentication attempts against multiple internal services
  4. Privilege escalation on a domain controller
  5. Data exfiltration from file servers

With SQL operators, you can create real-time threat hunting queries that automatically correlate these events and generate high-fidelity alerts:

-- Detect lateral movement patterns in real-time
SELECT
  attack_chain_id as id,
  window_end as eventtimestamp,
  CONCAT('SECURITY ALERT: Lateral movement detected for user ', username, 
         ' across ', CAST(affected_hosts AS STRING), ' hosts with ', 
         CAST(privilege_escalations AS STRING), ' privilege escalations') as message,
  21 as severity,  -- CRITICAL severity for confirmed attack patterns
  MAP[
    'username', ARRAY[username],
    'alert_type', ARRAY['lateral_movement'],
    'threat_level', ARRAY['critical'],
    'attack_stage', ARRAY['active_breach']
  ] as tags,
  JSON_OBJECT(
    'attack_timeline_minutes' VALUE attack_duration_minutes,
    'affected_hosts' VALUE affected_hosts,
    'authentication_attempts' VALUE auth_attempts,
    'privilege_escalations' VALUE privilege_escalations,
    'data_access_events' VALUE data_events,
    'threat_score' VALUE threat_score,
    'recommended_action' VALUE 'immediate_isolation',
    'attack_vector' VALUE initial_vector
  ) as attributes
FROM (
  SELECT
    window_start,
    window_end,
    username,
    MD5(CONCAT(username, CAST(window_start AS STRING))) as attack_chain_id,
    COUNT(DISTINCT JSON_VALUE(attributes, '$.source_host')) as affected_hosts,
    COUNT(*) FILTER (WHERE message LIKE '%authentication%') as auth_attempts,
    COUNT(*) FILTER (WHERE message LIKE '%privilege%' OR severity >= 17) as privilege_escalations,
    COUNT(*) FILTER (WHERE message LIKE '%file_access%' OR message LIKE '%data%') as data_events,
    (CAST(window_end AS BIGINT) - CAST(window_start AS BIGINT)) / 60000 as attack_duration_minutes,
    -- Calculate threat score based on multiple factors
    (COUNT(DISTINCT JSON_VALUE(attributes, '$.source_host')) * 10) + 
    (COUNT(*) FILTER (WHERE severity >= 17) * 5) + 
    (COUNT(*) FILTER (WHERE message LIKE '%admin%' OR message LIKE '%root%') * 15) as threat_score,
    FIRST_VALUE(JSON_VALUE(attributes, '$.attack_vector')) OVER (
      PARTITION BY username ORDER BY eventtimestamp
    ) as initial_vector
  FROM TABLE(
    SESSION(
      TABLE logs PARTITION BY JSON_VALUE(attributes, '$.username'),
      DESCRIPTOR(TO_TIMESTAMP_LTZ(eventtimestamp, 3)),
      INTERVAL '30' MINUTES  -- Group events within 30-minute windows per user
    )
  )
  WHERE 
    -- Focus on authentication and privilege-related events
    (message LIKE '%authentication%' OR message LIKE '%login%' OR 
     message LIKE '%privilege%' OR message LIKE '%sudo%' OR 
     message LIKE '%admin%' OR severity >= 17)
    AND JSON_EXISTS(attributes, '$.username')
  GROUP BY window_start, window_end, JSON_VALUE(attributes, '$.username')
  -- Only trigger on suspicious patterns
  HAVING 
    COUNT(DISTINCT JSON_VALUE(attributes, '$.source_host')) >= 3  -- Multiple hosts
    AND COUNT(*) FILTER (WHERE severity >= 17) >= 2  -- Multiple high-severity events
    AND COUNT(*) >= 5  -- Minimum activity threshold
)
Automated Threat Intelligence Enrichment

SQL operators can also enrich security events with threat intelligence in real-time, creating contextual alerts that help security teams prioritize their response:

-- Enrich security events with threat intelligence and risk scoring
SELECT
  *,
  CASE 
    WHEN JSON_VALUE(attributes, '$.source_ip') IN (
      '192.168.1.100', '10.0.0.50'  -- Known compromised IPs
    ) THEN 'known_bad_actor'
    WHEN JSON_VALUE(attributes, '$.source_country') IN ('CN', 'RU', 'KP') 
         AND EXTRACT(HOUR FROM TO_TIMESTAMP_LTZ(eventtimestamp, 3)) BETWEEN 2 AND 6 
    THEN 'suspicious_geography_time'
    WHEN message LIKE '%brute_force%' OR message LIKE '%dictionary_attack%'
    THEN 'automated_attack'
    ELSE 'standard_event'
  END as threat_classification,
  
  -- Risk score calculation
  CASE
    WHEN severity >= 20 THEN 100  -- Critical events
    WHEN JSON_VALUE(attributes, '$.failed_attempts') IS NOT NULL 
         AND CAST(JSON_VALUE(attributes, '$.failed_attempts') AS INT) > 10 THEN 85
    WHEN message LIKE '%admin%' AND severity >= 17 THEN 75
    WHEN JSON_VALUE(attributes, '$.after_hours') = 'true' THEN 60
    ELSE 25
  END as risk_score,
  
  -- Recommended response actions
  CASE
    WHEN JSON_VALUE(attributes, '$.source_ip') IN ('192.168.1.100', '10.0.0.50')
    THEN 'immediate_block_and_investigate'
    WHEN risk_score >= 75
    THEN 'escalate_to_soc_analyst'
    WHEN risk_score >= 60
    THEN 'monitor_and_correlate'
    ELSE 'log_for_baseline'
  END as recommended_action

FROM logs
WHERE 
  tags['service'][1] IN ('auth', 'vpn', 'ssh', 'rdp', 'admin')
  AND (severity >= 13 OR message LIKE '%security%' OR message LIKE '%auth%')
Compliance and Forensic Readiness

The SQL operator can also create compliance-ready audit logs and forensic timelines automatically:

-- Create forensic timeline for security incidents
SELECT
  CONCAT('AUDIT: User ', username, ' action: ', action_type, 
         ' on resource: ', resource, ' result: ', result) as message,
  9 as severity,  -- INFO level for audit trails
  MAP[
    'audit_type', ARRAY['user_activity'],
    'compliance', ARRAY['sox', 'gdpr', 'hipaa'],
    'forensic_ready', ARRAY['true']
  ] as tags,
  JSON_OBJECT(
    'username' VALUE username,
    'action_type' VALUE action_type,
    'resource_accessed' VALUE resource,
    'action_result' VALUE result,
    'source_ip' VALUE source_ip,
    'user_agent' VALUE user_agent,
    'session_id' VALUE session_id,
    'privilege_level' VALUE privilege_level,
    'data_classification' VALUE data_classification,
    'retention_period_days' VALUE 2555  -- 7 years for compliance
  ) as attributes
FROM (
  SELECT
    JSON_VALUE(attributes, '$.username') as username,
    CASE 
      WHEN message LIKE '%create%' THEN 'create'
      WHEN message LIKE '%read%' OR message LIKE '%view%' THEN 'read'
      WHEN message LIKE '%update%' OR message LIKE '%modify%' THEN 'update'
      WHEN message LIKE '%delete%' THEN 'delete'
      ELSE 'unknown'
    END as action_type,
    JSON_VALUE(attributes, '$.resource') as resource,
    CASE WHEN severity < 13 THEN 'success' ELSE 'failure' END as result,
    JSON_VALUE(attributes, '$.source_ip') as source_ip,
    JSON_VALUE(attributes, '$.user_agent') as user_agent,
    JSON_VALUE(attributes, '$.session_id') as session_id,
    JSON_VALUE(attributes, '$.privilege_level') as privilege_level,
    CASE 
      WHEN JSON_VALUE(attributes, '$.resource') LIKE '%pii%' THEN 'sensitive'
      WHEN JSON_VALUE(attributes, '$.resource') LIKE '%financial%' THEN 'restricted'
      ELSE 'internal'
    END as data_classification
  FROM logs
  WHERE JSON_EXISTS(attributes, '$.username') 
    AND JSON_EXISTS(attributes, '$.resource')
)
The Security Advantage

This approach provides several key advantages over traditional security tools:

  1. Real-time correlation: Events are connected as they happen, not hours later during batch processing
  2. Business context: Security events include business-relevant information like data classification and user roles
  3. Automated enrichment: Threat intelligence and risk scoring happen automatically
  4. Compliance-ready: Audit trails are generated in the format required for regulatory compliance
  5. Custom detection logic: Security teams can encode their specific threat models and detection rules

With AI assistance, security teams can describe complex attack patterns in plain English and get sophisticated detection queries that would typically require deep expertise in both cybersecurity and stream processing.

The Future of Observability

SQL operators represent a shift from passive log collection to active data processing. Instead of just storing logs and hoping to find patterns later, you're creating the exact observability data you need in real-time, using the business logic that matters to your application.

Whether you're tracking multiplayer gaming sessions, e-commerce checkout flows, or complex financial transactions, SQL operators let you define traces and metrics that follow your users' actual journey—not just your technical architecture.

Combined with AI assistance for query generation, this approach makes advanced observability accessible to teams of all sizes and skill levels. You can focus on understanding your business requirements while AI handles the technical complexity of stream processing.

The result? Faster incident resolution, better user experience insights, and observability that actually matches how your business operates—all achievable without deep expertise in stream processing frameworks.

Getting Started

Ready to transform your observability approach? Start by:

  1. Identifying a business flow that spans multiple services
  2. Describing your ideal trace or metric in plain English
  3. Using AI to generate the SQL transform based on your log structure
  4. Iterating and refining based on your specific needs

The future of observability is here, and it speaks SQL—with AI as your guide. Try Grepr free today! Sign up at app.grepr.ai/signup or contact us on our website at grepr.ai.

Share this post

More blog posts

All blog posts
Product

Using Grepr With Datadog

Discover how Grepr acts as an intelligent pipeline, leveraging AI to detect patterns, summarize noisy data, and directly pass unique messages, which can lead to up to a 90% reduction in data volume and substantial savings on Datadog platform costs. The article walks you through the setup process, from creating necessary integrations and configuring data lakes for low-cost storage, to building pipelines that process and route your data. It also covers how to adjust your Datadog Agent configurations and addresses strategies to mitigate potential data skewing from summarization, ensuring you maintain full insight into your applications without discarding any valuable data.
July 10, 2025
Product

Use Grepr to Avoid Observability Vendor Lock-In

Grepr is an intelligent observability pipeline that optimizes, analyzes, and routes data in real time, sitting between your agents and observability platform. Utilizing machine learning and a rules engine, it efficiently detects data patterns, filters out repetitive information, and forwards only essential summaries or unique messages. This seamless integration helps organizations significantly cut observability costs by up to 90%, enable long-term data retention, and make valuable insights available for business reporting and AI, all with minimal configuration changes.
July 8, 2025
Product

Aggregate my log volume by 90%, yet still find anything I need? How is that possible?

Grepr uses unsupervised machine learning to reduce log volume by over 90% while preserving important data through smart, configurable aggregation. It passes low-frequency messages through unmodified, allows engineers to retain specific parameters like user IDs, and supports backfilling logs via API triggers when deeper detail is needed—such as during support tickets. For added flexibility, trace sampling can capture full logs for a subset of users, and all original logs are archived in a searchable data lake. This gives teams control, reduces noise, and enables cost-effective observability without sacrificing access to critical information.
June 30, 2025

Get started free and see Grepr in action in 20 minutes.