Shashikanth Kota

RxJS Transformation Operators: Examples and Use Cases

Published on June 20, 2023

Introduction

Transformation operators in RxJS allow you to modify, reshape, and process the data emitted by Observables. These operators are essential for converting raw data into the format your application needs. From simple value transformations to complex buffering strategies and mapping operations, transformation operators form the backbone of reactive data processing. This post explores the most commonly used transformation operators with practical examples.

Table of Contents

  1. buffer
  2. bufferCount
  3. bufferTime ⭐
  4. bufferToggle
  5. bufferWhen
  6. concatMap ⭐
  7. concatMapTo
  8. expand
  9. exhaustMap
  10. groupBy
  11. map ⭐
  12. mapTo
  13. mergeMap / flatMap ⭐
  14. mergeScan
  15. partition
  16. pluck
  17. reduce
  18. scan ⭐
  19. switchMap ⭐
  20. switchMapTo
  21. toArray
  22. window
  23. windowCount
  24. windowTime
  25. windowToggle
  26. windowWhen
  27. Comparison of Key Operators

buffer

Purpose: Collects values from the source Observable until a notifier Observable emits, then emits the collected values as an array.

import { interval, fromEvent } from 'rxjs';
import { buffer, take } from 'rxjs/operators';

// Create a source Observable that emits a value every 500ms
const source$ = interval(500);

// Use mouse clicks as the buffer boundary
const clicks$ = fromEvent(document, 'click');

// Buffer values from source until a click occurs
const buffered$ = source$.pipe(
  buffer(clicks$),
  take(3) // Take only 3 buffer emissions for the example
);

// Subscribe to see the buffered values
buffered$.subscribe(
  bufferedValues => console.log('Buffered values:', bufferedValues),
  err => console.error(err),
  () => console.log('Complete')
);

// Output (after clicking three times, with varying intervals between clicks):
// Buffered values: [0, 1, 2, 3]
// Buffered values: [4, 5, 6]
// Buffered values: [7, 8, 9, 10, 11]
// Complete

When to use:

bufferCount

Purpose: Collects values from the source Observable until the specified buffer size is reached, then emits the collected values as an array.

import { interval } from 'rxjs';
import { bufferCount, take } from 'rxjs/operators';

// Example 1: Basic bufferCount
console.log('Basic bufferCount example:');
interval(500).pipe(
  bufferCount(3), // Collect 3 values before emitting
  take(4) // Take only 4 buffer emissions for the example
).subscribe(
  buffer => console.log('Buffer of 3:', buffer),
  err => console.error(err),
  () => console.log('Complete')
);

// Output:
// Buffer of 3: [0, 1, 2]
// Buffer of 3: [3, 4, 5]
// Buffer of 3: [6, 7, 8]
// Buffer of 3: [9, 10, 11]
// Complete

// Example 2: Overlapping buffers
console.log('Overlapping bufferCount example:');
interval(500).pipe(
  bufferCount(3, 1), // Buffer size 3, start new buffer every 1 value
  take(4) // Take only 4 buffer emissions for the example
).subscribe(
  buffer => console.log('Overlapping buffer:', buffer),
  err => console.error(err),
  () => console.log('Complete')
);

// Output:
// Overlapping buffer: [0, 1, 2]
// Overlapping buffer: [1, 2, 3]
// Overlapping buffer: [2, 3, 4]
// Overlapping buffer: [3, 4, 5]
// Complete

When to use:

bufferTime ⭐

Purpose: Collects values from the source Observable for a specified duration, then emits the collected values as an array.

import { interval } from 'rxjs';
import { bufferTime, take } from 'rxjs/operators';

// Example 1: Basic bufferTime
console.log('Basic bufferTime example:');
interval(300).pipe(
  bufferTime(1000), // Collect values for 1 second
  take(5) // Take only 5 buffer emissions for the example
).subscribe(
  buffer => console.log('1-second buffer:', buffer),
  err => console.error(err),
  () => console.log('Complete')
);

// Output:
// 1-second buffer: [0, 1, 2]
// 1-second buffer: [3, 4, 5]
// 1-second buffer: [6, 7, 8]
// 1-second buffer: [9, 10, 11]
// 1-second buffer: [12, 13, 14]
// Complete

// Example 2: Overlapping time buffers
console.log('Overlapping bufferTime example:');
interval(300).pipe(
  bufferTime(1000, 500), // 1-second buffer, new buffer every 500ms
  take(5) // Take only 5 buffer emissions for the example
).subscribe(
  buffer => console.log('Overlapping time buffer:', buffer),
  err => console.error(err),
  () => console.log('Complete')
);

// Output:
// Overlapping time buffer: [0, 1, 2]
// Overlapping time buffer: [1, 2, 3, 4]
// Overlapping time buffer: [3, 4, 5, 6]
// Overlapping time buffer: [5, 6, 7, 8]
// Overlapping time buffer: [7, 8, 9, 10]
// Complete

When to use:

bufferToggle

Purpose: Collects values from the source Observable starting when an opening Observable emits, and ending when a closing Observable emits.

import { interval, timer } from 'rxjs';
import { bufferToggle, take } from 'rxjs/operators';

// Source emits every 100ms
const source$ = interval(100);

// Opening observable: emit every 500ms
const openings$ = interval(500);

// Closing observable factory: emit after 300ms
const closingSelector = () => timer(300);

// Buffer values when openings$ emits, and collect until closingSelector emits
source$.pipe(
  bufferToggle(openings$, closingSelector),
  take(3) // Take only 3 buffer emissions for the example
).subscribe(
  buffer => console.log('Buffered values:', buffer),
  err => console.error(err),
  () => console.log('Complete')
);

// Output:
// Buffered values: [5, 6, 7]
// Buffered values: [10, 11, 12]
// Buffered values: [15, 16, 17]
// Complete

When to use:

bufferWhen

Purpose: Collects values from the source Observable and emits them as an array when a factory function produces an Observable that emits.

import { interval } from 'rxjs';
import { bufferWhen, take } from 'rxjs/operators';

// Source emits every 100ms
const source$ = interval(100);

// Buffer closing factory: create random-duration buffers
const closingSelector = () => {
  // Random buffer duration between 500ms and 1500ms
  const randomDuration = Math.floor(Math.random() * 1000) + 500;
  return interval(randomDuration).pipe(take(1));
};

// Buffer values until closingSelector emits
source$.pipe(
  bufferWhen(closingSelector),
  take(3) // Take only 3 buffer emissions for the example
).subscribe(
  buffer => console.log('Buffered with random duration:', buffer),
  err => console.error(err),
  () => console.log('Complete')
);

// Output (will vary due to random durations):
// Buffered with random duration: [0, 1, 2, 3, 4, 5]
// Buffered with random duration: [6, 7, 8, 9, 10, 11, 12]
// Buffered with random duration: [13, 14, 15, 16]
// Complete

When to use:

concatMap ⭐

Purpose: Maps each value from the source Observable to an inner Observable, then flattens the result in a serialized manner, waiting for each inner Observable to complete before moving to the next.

import { of, interval } from 'rxjs';
import { concatMap, take, delay, map } from 'rxjs/operators';

// Example 1: Basic concatMap with API calls
console.log('Basic concatMap example:');
const userIds = [1, 2, 3, 4];

// Simulate API calls that take different times to complete
function getUserDetails(id: number) {
  // Simulate different response times
  const delayTime = id * 1000;
  return of({ id, name: `User ${id}`, email: `user${id}@example.com` }).pipe(
    delay(delayTime),
    tap(() => console.log(`Fetched details for user ${id} after ${delayTime}ms`))
  );
}

// Process users sequentially, waiting for each API call to complete
of(...userIds).pipe(
  concatMap(id => getUserDetails(id))
).subscribe(
  user => console.log('Processed user:', user),
  err => console.error(err),
  () => console.log('All users processed')
);

// Output:
// Fetched details for user 1 after 1000ms
// Processed user: {id: 1, name: "User 1", email: "user1@example.com"}
// Fetched details for user 2 after 2000ms
// Processed user: {id: 2, name: "User 2", email: "user2@example.com"}
// Fetched details for user 3 after 3000ms
// Processed user: {id: 3, name: "User 3", email: "user3@example.com"}
// Fetched details for user 4 after 4000ms
// Processed user: {id: 4, name: "User 4", email: "user4@example.com"}
// All users processed

// Example 2: concatMap with events
console.log('concatMap with events example:');
const clicks$ = fromEvent(document, 'click');

clicks$.pipe(
  concatMap(() => interval(200).pipe(
    take(3),
    map(i => `Click result: ${i}`)
  ))
).subscribe(
  result => console.log(result)
);

// Output (after clicking twice in succession):
// Click result: 0
// Click result: 1
// Click result: 2
// (only after the first click sequence completes)
// Click result: 0
// Click result: 1
// Click result: 2

When to use:

concatMapTo

Purpose: Maps each value from the source Observable to the same inner Observable, then flattens the result in a serialized manner.

import { fromEvent, interval } from 'rxjs';
import { concatMapTo, take } from 'rxjs/operators';

// Source: click events
const clicks$ = fromEvent(document, 'click');

// Target: always map to the same sequence
const resultSequence$ = interval(500).pipe(
  take(3),
  map(i => `Result ${i}`)
);

// Map each click to the same result sequence
clicks$.pipe(
  concatMapTo(resultSequence$),
  take(6) // Limit the example to 6 emissions
).subscribe(
  result => console.log(result),
  err => console.error(err),
  () => console.log('Complete')
);

// Output (after clicking twice in succession):
// Result 0
// Result 1
// Result 2
// (only after the first sequence completes)
// Result 0
// Result 1
// Result 2
// Complete

When to use:

expand

Purpose: Recursively projects each source value to an Observable, and merges the results into the output Observable.

import { of } from 'rxjs';
import { expand, take } from 'rxjs/operators';

// Start with 1
of(1).pipe(
  // For each value x, return an Observable of x * 2
  expand(x => of(x * 2)),
  take(5) // Limit to 5 values to avoid infinite sequence
).subscribe(
  x => console.log(`Expanded value: ${x}`),
  err => console.error(err),
  () => console.log('Complete')
);

// Output:
// Expanded value: 1
// Expanded value: 2
// Expanded value: 4
// Expanded value: 8
// Expanded value: 16
// Complete

When to use:

exhaustMap

Purpose: Maps each value from the source Observable to an inner Observable, but ignores source values while the previous inner Observable is still active.

import { fromEvent, interval } from 'rxjs';
import { exhaustMap, take, tap } from 'rxjs/operators';

// Source: click events
const clicks$ = fromEvent(document, 'click');

// Map each click to a 3-second countdown, but ignore clicks during the countdown
clicks$.pipe(
  tap(() => console.log('Click detected')),
  exhaustMap(() => interval(1000).pipe(
    take(3),
    tap(i => console.log(`Processing click: ${i + 1} second`))
  ))
).subscribe(
  count => console.log(`Countdown: ${count + 1}`),
  err => console.error(err),
  () => console.log('Complete')
);

// Output (clicking multiple times during the countdown):
// Click detected
// Processing click: 1 second
// Countdown: 1
// Processing click: 2 second
// Countdown: 2
// Processing click: 3 second
// Countdown: 3
// (additional clicks during the countdown are ignored)
// Click detected (only after the previous countdown completes)
// Processing click: 1 second
// Countdown: 1
// ...

When to use:

groupBy

Purpose: Groups emissions from the source Observable based on a specified key selector function, emitting an Observable for each group.

import { from } from 'rxjs';
import { groupBy, mergeMap, toArray } from 'rxjs/operators';

// Sample data: people with different roles
const people = [
  { name: 'Alice', role: 'Developer' },
  { name: 'Bob', role: 'Designer' },
  { name: 'Charlie', role: 'Developer' },
  { name: 'Dave', role: 'Manager' },
  { name: 'Eve', role: 'Designer' },
  { name: 'Frank', role: 'Developer' }
];

// Group people by their role
from(people).pipe(
  // Group by role
  groupBy(person => person.role),
  // For each group, collect all items into an array
  mergeMap(group =>
    group.pipe(
      toArray(),
      map(groupArray => ({
        role: group.key,
        members: groupArray
      }))
    )
  )
).subscribe(
  group => console.log(`${group.role}: ${group.members.map(p => p.name).join(', ')}`),
  err => console.error(err),
  () => console.log('Grouping complete')
);

// Output:
// Developer: Alice, Charlie, Frank
// Designer: Bob, Eve
// Manager: Dave
// Grouping complete

When to use:

map ⭐

Purpose: Transforms each value emitted by the source Observable using a projection function.

import { from } from 'rxjs';
import { map } from 'rxjs/operators';

// Example 1: Basic value transformation
console.log('Basic map example:');
from([1, 2, 3, 4, 5]).pipe(
  map(value => value * 10)
).subscribe(
  result => console.log(`Transformed value: ${result}`),
  err => console.error(err),
  () => console.log('Transformation complete')
);

// Output:
// Transformed value: 10
// Transformed value: 20
// Transformed value: 30
// Transformed value: 40
// Transformed value: 50
// Transformation complete

// Example 2: Object transformation
console.log('Object transformation example:');
from([
  { name: 'Alice', age: 25 },
  { name: 'Bob', age: 30 },
  { name: 'Charlie', age: 35 }
]).pipe(
  map(person => ({
    displayName: person.name.toUpperCase(),
    birthYear: new Date().getFullYear() - person.age,
    canRetireIn: 65 - person.age
  }))
).subscribe(
  result => console.log(result),
  err => console.error(err),
  () => console.log('Object transformation complete')
);

// Output:
// { displayName: 'ALICE', birthYear: 1998, canRetireIn: 40 }
// { displayName: 'BOB', birthYear: 1993, canRetireIn: 35 }
// { displayName: 'CHARLIE', birthYear: 1988, canRetireIn: 30 }
// Object transformation complete

When to use:

mapTo

Purpose: Maps each emission from the source Observable to a constant value.

import { fromEvent, interval } from 'rxjs';
import { mapTo, take } from 'rxjs/operators';

// Example 1: Basic mapTo
console.log('Basic mapTo example:');
interval(1000).pipe(
  mapTo('PING'),
  take(3)
).subscribe(
  result => console.log(result),
  err => console.error(err),
  () => console.log('Complete')
);

// Output:
// PING
// PING
// PING
// Complete

// Example 2: Event normalization
console.log('Event normalization example:');
const buttonClicks$ = fromEvent(document.getElementById('myButton'), 'click');

buttonClicks$.pipe(
  mapTo({ action: 'BUTTON_CLICKED', timestamp: new Date().toISOString() })
).subscribe(
  event => console.log('Normalized event:', event),
  err => console.error(err)
);

// Output (when button is clicked):
// Normalized event: { action: 'BUTTON_CLICKED', timestamp: '2023-06-20T16:45:23.456Z' }
// Normalized event: { action: 'BUTTON_CLICKED', timestamp: '2023-06-20T16:45:25.789Z' }
// ...

When to use:

mergeMap / flatMap ⭐

Purpose: Maps each value from the source Observable to an inner Observable, then flattens the result by merging all inner Observables into a single Observable.

import { fromEvent, interval, of } from 'rxjs';
import { mergeMap, take, delay } from 'rxjs/operators';

// Example 1: Basic mergeMap with concurrent API calls
console.log('Basic mergeMap example:');
from([1, 2, 3, 4]).pipe(
  mergeMap(id => {
    // Simulate API call with different response times
    console.log(`Making API call for ID: ${id}`);
    return of({ id, name: `Item ${id}` }).pipe(
      delay(1000 / id) // Faster response for higher IDs
    );
  })
).subscribe(
  result => console.log(`Got result for ID ${result.id}: ${result.name}`),
  err => console.error(err),
  () => console.log('All API calls completed')
);

// Output (order may vary due to concurrent execution):
// Making API call for ID: 1
// Making API call for ID: 2
// Making API call for ID: 3
// Making API call for ID: 4
// Got result for ID 4: Item 4
// Got result for ID 3: Item 3
// Got result for ID 2: Item 2
// Got result for ID 1: Item 1
// All API calls completed

// Example 2: Controlling concurrency
console.log('Controlled concurrency example:');
from([1, 2, 3, 4, 5, 6]).pipe(
  // Second parameter limits concurrent inner Observables
  mergeMap(
    id => {
      console.log(`Starting task ${id}`);
      return interval(500).pipe(
        take(3),
        map(x => `Task ${id}: step ${x + 1}`)
      );
    },
    2 // Maximum 2 concurrent inner Observables
  )
).subscribe(
  result => console.log(result),
  err => console.error(err),
  () => console.log('All tasks completed')
);

// Output:
// Starting task 1
// Starting task 2
// Task 1: step 1
// Task 2: step 1
// Task 1: step 2
// Task 2: step 2
// Task 1: step 3
// Task 2: step 3
// Starting task 3
// Starting task 4
// Task 3: step 1
// ...

When to use:

mergeScan

Purpose: Applies an accumulator function to each value from the source Observable where the accumulator returns an Observable, then merges the results into the output Observable.

import { fromEvent, of } from 'rxjs';
import { mergeScan, map } from 'rxjs/operators';

// Track mouse drag distance
const mouseDown$ = fromEvent(document, 'mousedown');
const mouseMove$ = fromEvent(document, 'mousemove');
const mouseUp$ = fromEvent(document, 'mouseup');

mouseDown$.pipe(
  // When mouse down, start tracking movement until mouse up
  mergeMap(start => {
    // Extract starting coordinates
    const startX = (start as MouseEvent).clientX;
    const startY = (start as MouseEvent).clientY;

    // Track movement until mouse up
    return mouseMove$.pipe(
      // Use mergeScan to accumulate total distance
      mergeScan(
        (acc, move) => {
          const currentX = (move as MouseEvent).clientX;
          const currentY = (move as MouseEvent).clientY;

          // Calculate distance from last position
          const deltaX = currentX - acc.lastX;
          const deltaY = currentY - acc.lastY;
          const stepDistance = Math.sqrt(deltaX * deltaX + deltaY * deltaY);

          // Return new accumulated state
          return of({
            totalDistance: acc.totalDistance + stepDistance,
            lastX: currentX,
            lastY: currentY
          });
        },
        { totalDistance: 0, lastX: startX, lastY: startY }
      ),
      // Stop tracking when mouse up
      takeUntil(mouseUp$)
    );
  })
).subscribe(
  state => console.log(`Total drag distance: ${state.totalDistance.toFixed(2)}px`),
  err => console.error(err)
);

// Output (while dragging mouse):
// Total drag distance: 10.54px
// Total drag distance: 25.32px
// Total drag distance: 42.18px
// ...

When to use:

partition

Purpose: Splits the source Observable into two Observables: one with values that satisfy a predicate function and one with values that don’t.

import { from, partition } from 'rxjs';

// Sample data
const numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];

// Partition numbers into evens and odds
const [evens$, odds$] = partition(
  from(numbers),
  value => value % 2 === 0
);

// Subscribe to even numbers
evens$.subscribe(
  value => console.log(`Even number: ${value}`),
  err => console.error(err),
  () => console.log('Even numbers complete')
);

// Subscribe to odd numbers
odds$.subscribe(
  value => console.log(`Odd number: ${value}`),
  err => console.error(err),
  () => console.log('Odd numbers complete')
);

// Output:
// Odd number: 1
// Even number: 2
// Odd number: 3
// Even number: 4
// Odd number: 5
// Even number: 6
// Odd number: 7
// Even number: 8
// Odd number: 9
// Even number: 10
// Even numbers complete
// Odd numbers complete

When to use:

pluck

Purpose: Extracts a specified nested property from each emitted object.

import { from } from 'rxjs';
import { pluck } from 'rxjs/operators';

// Sample data with nested properties
const users = [
  { id: 1, name: 'Alice', profile: { age: 25, role: 'Developer' } },
  { id: 2, name: 'Bob', profile: { age: 30, role: 'Designer' } },
  { id: 3, name: 'Charlie', profile: { age: 35, role: 'Manager' } }
];

// Example 1: Extract simple property
console.log('Simple property extraction:');
from(users).pipe(
  pluck('name')
).subscribe(
  name => console.log(`Name: ${name}`),
  err => console.error(err),
  () => console.log('Simple extraction complete')
);

// Output:
// Name: Alice
// Name: Bob
// Name: Charlie
// Simple extraction complete

// Example 2: Extract nested property
console.log('Nested property extraction:');
from(users).pipe(
  pluck('profile', 'role')
).subscribe(
  role => console.log(`Role: ${role}`),
  err => console.error(err),
  () => console.log('Nested extraction complete')
);

// Output:
// Role: Developer
// Role: Designer
// Role: Manager
// Nested extraction complete

When to use:

reduce

Purpose: Applies an accumulator function over the source Observable, and emits the final accumulated value when the source completes.

import { from } from 'rxjs';
import { reduce } from 'rxjs/operators';

// Example 1: Sum of numbers
console.log('Sum example:');
from([1, 2, 3, 4, 5]).pipe(
  reduce((acc, value) => acc + value, 0)
).subscribe(
  sum => console.log(`Sum: ${sum}`),
  err => console.error(err),
  () => console.log('Reduction complete')
);

// Output:
// Sum: 15
// Reduction complete

// Example 2: Building a complex result
console.log('Complex reduction example:');
from([
  { name: 'Alice', sales: 120, department: 'Electronics' },
  { name: 'Bob', sales: 90, department: 'Home' },
  { name: 'Charlie', sales: 150, department: 'Electronics' },
  { name: 'Dave', sales: 80, department: 'Home' },
  { name: 'Eve', sales: 200, department: 'Electronics' }
]).pipe(
  reduce((result, employee) => {
    // Group by department
    if (!result[employee.department]) {
      result[employee.department] = {
        totalSales: 0,
        employees: []
      };
    }

    // Update department data
    result[employee.department].totalSales += employee.sales;
    result[employee.department].employees.push(employee.name);

    return result;
  }, {} as Record<string, { totalSales: number, employees: string[] }>)
).subscribe(
  result => {
    console.log('Sales report:');
    Object.entries(result).forEach(([dept, data]) => {
      console.log(`${dept}: ${data.totalSales} units sold by ${data.employees.join(', ')}`);
    });
  },
  err => console.error(err),
  () => console.log('Report complete')
);

// Output:
// Sales report:
// Electronics: 470 units sold by Alice, Charlie, Eve
// Home: 170 units sold by Bob, Dave
// Report complete

When to use:

scan ⭐

Purpose: Similar to reduce, but emits the accumulated value after each source emission instead of just the final value.

import { from, fromEvent } from 'rxjs';
import { scan, map } from 'rxjs/operators';

// Example 1: Running total
console.log('Running total example:');
from([1, 2, 3, 4, 5]).pipe(
  scan((acc, value) => acc + value, 0)
).subscribe(
  total => console.log(`Running total: ${total}`),
  err => console.error(err),
  () => console.log('Scan complete')
);

// Output:
// Running total: 1
// Running total: 3
// Running total: 6
// Running total: 10
// Running total: 15
// Scan complete

// Example 2: State management
console.log('State management example:');

// Define actions
const increment = { type: 'INCREMENT' };
const decrement = { type: 'DECREMENT' };
const reset = { type: 'RESET' };

// Create action stream
const actions$ = fromEvent(document, 'click').pipe(
  map((event: MouseEvent) => {
    if (event.altKey) return reset;
    return event.shiftKey ? decrement : increment;
  })
);

// Reducer function
function counterReducer(state, action) {
  switch (action.type) {
    case 'INCREMENT':
      return { ...state, count: state.count + 1 };
    case 'DECREMENT':
      return { ...state, count: state.count - 1 };
    case 'RESET':
      return { ...state, count: 0 };
    default:
      return state;
  }
}

// Initial state
const initialState = { count: 0 };

// Create state stream
const state$ = actions$.pipe(
  scan(counterReducer, initialState)
);

// Subscribe to state changes
state$.subscribe(
  state => console.log(`Counter: ${state.count}`),
  err => console.error(err)
);

// Output (after clicking):
// Counter: 1
// Counter: 2
// Counter: 3
// (shift+click) Counter: 2
// (alt+click) Counter: 0

When to use:

switchMap ⭐

Purpose: Maps each value from the source Observable to an inner Observable, then flattens the result by replacing the previous inner Observable with the new one when a new source value arrives.

import { fromEvent, interval, of } from 'rxjs';
import { switchMap, take, delay, tap } from 'rxjs/operators';

// Example 1: Basic switchMap with search
console.log('Basic switchMap example:');

// Simulate a search input field
const searchInput$ = fromEvent(document.getElementById('searchInput'), 'input');

// Use switchMap to handle search requests
searchInput$.pipe(
  tap(() => console.log('Search input changed')),
  // Extract the search term
  map((event: InputEvent) => (event.target as HTMLInputElement).value),
  // Only proceed if search term is at least 3 characters
  filter(term => term.length >= 3),
  // Debounce to avoid too many requests
  debounceTime(300),
  // Use switchMap to cancel previous search and start a new one
  switchMap(term => {
    console.log(`Searching for: ${term}`);
    // Simulate API call
    return of(`Search results for: ${term}`).pipe(
      delay(1000), // Simulate network delay
      tap(() => console.log(`Received results for: ${term}`))
    );
  })
).subscribe(
  results => console.log(results),
  err => console.error(err)
);

// Output (when typing "react" then quickly changing to "reactive"):
// Search input changed
// Search input changed
// Search input changed
// Searching for: rea
// Search input changed
// Search input changed
// Search input changed
// Searching for: reactive
// Received results for: reactive
// Search results for: reactive
// (Note: results for "rea" were cancelled)

// Example 2: HTTP requests with switchMap
console.log('HTTP requests with switchMap example:');

// Simulate user ID selection
const userIds$ = from([1, 2, 3]);

// Function to simulate HTTP request
function getUserDetails(id: number) {
  console.log(`Fetching details for user ${id}...`);
  return of({ id, name: `User ${id}`, email: `user${id}@example.com` }).pipe(
    delay(id * 1000), // Different delay times
    tap(() => console.log(`Completed fetch for user ${id}`))
  );
}

// Use switchMap to always get the latest user
userIds$.pipe(
  // Delay between emissions to see the effect
  concatMap(id => of(id).pipe(delay(500))),
  // Use switchMap to switch to new user and cancel previous request
  switchMap(id => getUserDetails(id))
).subscribe(
  user => console.log('Received user:', user),
  err => console.error(err),
  () => console.log('User sequence complete')
);

// Output:
// Fetching details for user 1...
// Fetching details for user 2... (previous request for user 1 is cancelled)
// Fetching details for user 3... (previous request for user 2 is cancelled)
// Completed fetch for user 3
// Received user: { id: 3, name: 'User 3', email: 'user3@example.com' }
// User sequence complete

When to use:

switchMapTo

Purpose: Maps each value from the source Observable to the same inner Observable, then flattens the result by replacing the previous inner Observable with the new one when a new source value arrives.

import { fromEvent, interval } from 'rxjs';
import { switchMapTo, take, tap } from 'rxjs/operators';

// Example: Reset a timer on each click
console.log('switchMapTo example:');

// Source: click events
const clicks$ = fromEvent(document.getElementById('resetButton'), 'click');

// Target: always the same 5-second countdown
const countdown$ = interval(1000).pipe(
  take(5),
  map(i => 4 - i),
  tap(value => console.log(`Countdown: ${value}`))
);

// Reset the countdown on each click
clicks$.pipe(
  tap(() => console.log('Button clicked, resetting countdown')),
  switchMapTo(countdown$)
).subscribe(
  value => console.log(`Displaying: ${value}`),
  err => console.error(err)
);

// Output (when clicking during countdown):
// Button clicked, resetting countdown
// Countdown: 4
// Displaying: 4
// Countdown: 3
// Displaying: 3
// Button clicked, resetting countdown (clicked again)
// Countdown: 4
// Displaying: 4
// Countdown: 3
// Displaying: 3
// ...

When to use:

toArray

Purpose: Collects all values from the source Observable and emits them as an array when the source completes.

import { interval, of } from 'rxjs';
import { toArray, take, map } from 'rxjs/operators';

// Example 1: Basic toArray
console.log('Basic toArray example:');
of(1, 2, 3, 4, 5).pipe(
  toArray()
).subscribe(
  array => console.log('Collected array:', array),
  err => console.error(err),
  () => console.log('Collection complete')
);

// Output:
// Collected array: [1, 2, 3, 4, 5]
// Collection complete

// Example 2: Collecting async values
console.log('Async collection example:');
interval(500).pipe(
  take(4),
  map(i => ({ id: i, value: `Item ${i}` })),
  toArray()
).subscribe(
  array => console.log('Collected objects:', array),
  err => console.error(err),
  () => console.log('Async collection complete')
);

// Output (after 2 seconds):
// Collected objects: [
//   { id: 0, value: 'Item 0' },
//   { id: 1, value: 'Item 1' },
//   { id: 2, value: 'Item 2' },
//   { id: 3, value: 'Item 3' }
// ]
// Async collection complete

When to use:

window

Purpose: Similar to buffer, but emits nested Observables instead of arrays.

import { interval, fromEvent } from 'rxjs';
import { window, mergeMap, take, toArray } from 'rxjs/operators';

// Create a source Observable that emits a value every 500ms
const source$ = interval(500);

// Use mouse clicks as the window boundary
const clicks$ = fromEvent(document, 'click');

// Window values from source until a click occurs
source$.pipe(
  window(clicks$),
  take(3), // Take only 3 window emissions for the example
  mergeMap(window$ => window$.pipe(toArray())), // Convert each window to an array
).subscribe(
  windowValues => console.log('Window values:', windowValues),
  err => console.error(err),
  () => console.log('Complete')
);

// Output (after clicking three times, with varying intervals between clicks):
// Window values: [0, 1, 2, 3]
// Window values: [4, 5, 6]
// Window values: [7, 8, 9, 10, 11]
// Complete

When to use:

windowCount

Purpose: Similar to bufferCount, but emits nested Observables instead of arrays.

import { interval } from 'rxjs';
import { windowCount, mergeMap, take, toArray } from 'rxjs/operators';

// Example 1: Basic windowCount
console.log('Basic windowCount example:');
interval(500).pipe(
  take(10),
  windowCount(3), // Collect 3 values per window
  mergeMap(window$ => window$.pipe(toArray())), // Convert each window to an array
).subscribe(
  windowValues => console.log('Window of 3:', windowValues),
  err => console.error(err),
  () => console.log('Complete')
);

// Output:
// Window of 3: [0, 1, 2]
// Window of 3: [3, 4, 5]
// Window of 3: [6, 7, 8]
// Window of 3: [9]
// Complete

// Example 2: Overlapping windows
console.log('Overlapping windowCount example:');
interval(500).pipe(
  take(10),
  windowCount(3, 1), // Window size 3, start new window every 1 value
  mergeMap(window$ =>
    window$.pipe(
      toArray(),
      map(arr => arr.join(', '))
    )
  ),
  take(5) // Take only 5 window emissions for the example
).subscribe(
  windowValues => console.log('Overlapping window:', windowValues),
  err => console.error(err),
  () => console.log('Complete')
);

// Output:
// Overlapping window: 0, 1, 2
// Overlapping window: 1, 2, 3
// Overlapping window: 2, 3, 4
// Overlapping window: 3, 4, 5
// Overlapping window: 4, 5, 6
// Complete

When to use:

windowTime

Purpose: Similar to bufferTime, but emits nested Observables instead of arrays.

import { interval } from 'rxjs';
import { windowTime, mergeMap, take, toArray } from 'rxjs/operators';

// Example 1: Basic windowTime
console.log('Basic windowTime example:');
interval(300).pipe(
  windowTime(1000), // Collect values for 1 second
  take(3), // Take only 3 window emissions for the example
  mergeMap(window$ => window$.pipe(toArray())), // Convert each window to an array
).subscribe(
  windowValues => console.log('1-second window:', windowValues),
  err => console.error(err),
  () => console.log('Complete')
);

// Output:
// 1-second window: [0, 1, 2]
// 1-second window: [3, 4, 5]
// 1-second window: [6, 7, 8]
// Complete

// Example 2: Overlapping time windows
console.log('Overlapping windowTime example:');
interval(300).pipe(
  windowTime(1000, 500), // 1-second window, new window every 500ms
  take(3), // Take only 3 window emissions for the example
  mergeMap(window$ => window$.pipe(toArray())), // Convert each window to an array
).subscribe(
  windowValues => console.log('Overlapping time window:', windowValues),
  err => console.error(err),
  () => console.log('Complete')
);

// Output:
// Overlapping time window: [0, 1, 2]
// Overlapping time window: [1, 2, 3, 4]
// Overlapping time window: [3, 4, 5, 6]
// Complete

When to use:

windowToggle

Purpose: Similar to bufferToggle, but emits nested Observables instead of arrays.

import { interval, timer } from 'rxjs';
import { windowToggle, mergeMap, take, toArray } from 'rxjs/operators';

// Source emits every 100ms
const source$ = interval(100);

// Opening observable: emit every 500ms
const openings$ = interval(500);

// Closing observable factory: emit after 300ms
const closingSelector = () => timer(300);

// Window values when openings$ emits, and collect until closingSelector emits
source$.pipe(
  windowToggle(openings$, closingSelector),
  take(3), // Take only 3 window emissions for the example
  mergeMap(window$ => window$.pipe(toArray())), // Convert each window to an array
).subscribe(
  windowValues => console.log('Windowed values:', windowValues),
  err => console.error(err),
  () => console.log('Complete')
);

// Output:
// Windowed values: [5, 6, 7]
// Windowed values: [10, 11, 12]
// Windowed values: [15, 16, 17]
// Complete

When to use:

windowWhen

Purpose: Similar to bufferWhen, but emits nested Observables instead of arrays.

import { interval } from 'rxjs';
import { windowWhen, mergeMap, take, toArray } from 'rxjs/operators';

// Source emits every 100ms
const source$ = interval(100);

// Window closing factory: create random-duration windows
const closingSelector = () => {
  // Random window duration between 500ms and 1500ms
  const randomDuration = Math.floor(Math.random() * 1000) + 500;
  return interval(randomDuration).pipe(take(1));
};

// Window values until closingSelector emits
source$.pipe(
  windowWhen(closingSelector),
  take(3), // Take only 3 window emissions for the example
  mergeMap(window$ => window$.pipe(toArray())), // Convert each window to an array
).subscribe(
  windowValues => console.log('Windowed with random duration:', windowValues),
  err => console.error(err),
  () => console.log('Complete')
);

// Output (will vary due to random durations):
// Windowed with random duration: [0, 1, 2, 3, 4, 5]
// Windowed with random duration: [6, 7, 8, 9, 10, 11, 12]
// Windowed with random duration: [13, 14, 15, 16]
// Complete

When to use:

Comparison of Key Operators

Operator Buffering Strategy Order Preservation Concurrency Use Case
buffer Event-based Yes N/A Collect values between events
bufferCount Count-based Yes N/A Fixed-size batches
bufferTime Time-based Yes N/A Time-window batching
bufferToggle Start/stop events Yes N/A Complex time windows
bufferWhen Dynamic closing Yes N/A Adaptive buffering
concatMap N/A Yes Sequential Ordered processing
concatMapTo N/A Yes Sequential Same sequence for each value
expand N/A Breadth-first Concurrent Recursive algorithms
exhaustMap N/A First only Blocking Ignore during processing
groupBy N/A Depends on source N/A Categorize by key
map N/A Yes N/A Basic value transformation
mapTo N/A Yes N/A Replace with constant value
mergeMap N/A No Concurrent Parallel processing
mergeScan N/A No Concurrent Stateful async accumulation
partition N/A Yes N/A Split stream by condition
pluck N/A Yes N/A Extract object properties
reduce N/A N/A N/A Final accumulation
scan N/A Yes N/A Running accumulation
switchMap N/A No Switching Latest value wins
switchMapTo N/A No Switching Reset to same Observable
toArray N/A Yes N/A Collect all as array
window Event-based Yes N/A Observable windows between events
windowCount Count-based Yes N/A Observable windows of fixed size
windowTime Time-based Yes N/A Observable windows by time
windowToggle Start/stop events Yes N/A Observable windows with complex conditions
windowWhen Dynamic closing Yes N/A Observable windows with dynamic duration

Practical Examples

Real-time Data Aggregation with bufferTime

import { interval } from 'rxjs';
import { bufferTime, map } from 'rxjs/operators';

// Simulate sensor readings coming in at high frequency (every 100ms)
const sensorReadings$ = interval(100).pipe(
  map(() => ({
    temperature: 20 + Math.random() * 5,
    humidity: 50 + Math.random() * 10,
    timestamp: new Date().toISOString()
  }))
);

// Aggregate readings over 1-second windows
sensorReadings$.pipe(
  bufferTime(1000),
  map(readings => {
    // Skip empty buffers
    if (readings.length === 0) return null;

    // Calculate averages
    const avgTemperature = readings.reduce((sum, reading) => sum + reading.temperature, 0) / readings.length;
    const avgHumidity = readings.reduce((sum, reading) => sum + reading.humidity, 0) / readings.length;

    return {
      avgTemperature: avgTemperature.toFixed(2),
      avgHumidity: avgHumidity.toFixed(2),
      sampleCount: readings.length,
      timeWindow: {
        start: readings[0].timestamp,
        end: readings[readings.length - 1].timestamp
      }
    };
  }),
  // Filter out null results from empty buffers
  filter(result => result !== null)
).subscribe(
  aggregatedData => console.log('Aggregated sensor data:', aggregatedData)
);

// Output:
// Aggregated sensor data: {
//   avgTemperature: "22.34",
//   avgHumidity: "54.21",
//   sampleCount: 10,
//   timeWindow: {
//     start: "2023-06-20T15:30:00.123Z",
//     end: "2023-06-20T15:30:00.923Z"
//   }
// }
// ... (continues every second)

Sequential API Calls with concatMap

import { from, of } from 'rxjs';
import { concatMap, catchError, tap } from 'rxjs/operators';

// Simulate a database with user IDs and their order IDs
const userDatabase = [
  { id: 1, name: 'Alice' },
  { id: 2, name: 'Bob' },
  { id: 3, name: 'Charlie' }
];

const orderDatabase = {
  1: [{ id: 101, product: 'Laptop', price: 1200 }, { id: 102, product: 'Phone', price: 800 }],
  2: [{ id: 201, product: 'Tablet', price: 500 }],
  3: [] // No orders
};

// Simulate API calls
function getUser(id) {
  return of(userDatabase.find(user => user.id === id)).pipe(
    delay(500), // Simulate network delay
    tap(user => console.log(`Fetched user: ${user?.name || 'Unknown'}`)),
    catchError(err => {
      console.error(`Error fetching user ${id}:`, err);
      return of(null);
    })
  );
}

function getUserOrders(userId) {
  return of(orderDatabase[userId] || []).pipe(
    delay(500), // Simulate network delay
    tap(orders => console.log(`Fetched ${orders.length} orders for user ${userId}`)),
    catchError(err => {
      console.error(`Error fetching orders for user ${userId}:`, err);
      return of([]);
    })
  );
}

// Process users sequentially, getting their orders
from([1, 2, 3, 4]).pipe(
  concatMap(userId =>
    getUser(userId).pipe(
      concatMap(user => {
        if (!user) return of({ userId, error: 'User not found' });

        return getUserOrders(user.id).pipe(
          map(orders => ({
            user,
            orders,
            totalSpent: orders.reduce((sum, order) => sum + order.price, 0)
          }))
        );
      })
    )
  )
).subscribe(
  result => console.log('Processed result:', result),
  err => console.error('Error in processing:', err),
  () => console.log('All users processed')
);

// Output:
// Fetched user: Alice
// Fetched 2 orders for user 1
// Processed result: {
//   user: { id: 1, name: 'Alice' },
//   orders: [
//     { id: 101, product: 'Laptop', price: 1200 },
//     { id: 102, product: 'Phone', price: 800 }
//   ],
//   totalSpent: 2000
// }
// Fetched user: Bob
// Fetched 1 orders for user 2
// Processed result: {
//   user: { id: 2, name: 'Bob' },
//   orders: [{ id: 201, product: 'Tablet', price: 500 }],
//   totalSpent: 500
// }
// Fetched user: Charlie
// Fetched 0 orders for user 3
// Processed result: {
//   user: { id: 3, name: 'Charlie' },
//   orders: [],
//   totalSpent: 0
// }
// Fetched user: Unknown
// Processed result: { userId: 4, error: 'User not found' }
// All users processed

Conclusion

Transformation operators are the workhorses of RxJS, allowing you to reshape, combine, and process data in powerful ways. By understanding these operators, you can build sophisticated data processing pipelines that handle complex asynchronous scenarios with clean, declarative code:

These transformation operators become even more powerful when combined with other RxJS operators to create comprehensive data processing pipelines tailored to your application’s needs.

References