Published on May 15, 2023
RxJS multicasting operators allow you to share a single subscription to an Observable among multiple subscribers. This is crucial for performance optimization and maintaining consistency across your application. Without multicasting, each subscriber would create a separate execution of the Observable, potentially causing duplicate side effects and wasting resources. This post explores the most commonly used multicasting operators with practical examples.
Purpose: Shares a source Observable by using a Subject as a proxy to which multiple Observers can subscribe.
import { interval, Subject } from 'rxjs';
import { multicast, take, tap } from 'rxjs/operators';
// Create a source Observable with a side effect
const source$ = interval(1000).pipe(
take(3),
tap(value => console.log(`Source emitted: ${value}`))
);
// Create a Subject to use as the multicast destination
const subject = new Subject<number>();
// Create a multicasted Observable
const multicasted$ = source$.pipe(
multicast(subject)
);
// First subscription
multicasted$.subscribe(value => console.log(`Observer 1: ${value}`));
// Second subscription (after a delay to demonstrate the shared execution)
setTimeout(() => {
multicasted$.subscribe(value => console.log(`Observer 2: ${value}`));
}, 1500);
// Connect the multicasted Observable to the source
const subscription = multicasted$.connect();
// Later, unsubscribe to clean up
setTimeout(() => {
subscription.unsubscribe();
console.log('Unsubscribed from source');
}, 5000);
// Output:
// Source emitted: 0
// Observer 1: 0
// Source emitted: 1
// Observer 1: 1
// Observer 2: 1
// Source emitted: 2
// Observer 1: 2
// Observer 2: 2
// Unsubscribed from source
When to use:
Purpose: A specialized version of multicast
that uses a regular Subject under the hood.
import { interval } from 'rxjs';
import { publish, tap, take } from 'rxjs/operators';
// Create a source Observable with a side effect
const source$ = interval(1000).pipe(
take(3),
tap(value => console.log(`Source emitted: ${value}`))
);
// Create a published (multicasted) Observable
const published$ = source$.pipe(
publish()
);
// First subscription
published$.subscribe(value => console.log(`Observer 1: ${value}`));
// Second subscription
setTimeout(() => {
published$.subscribe(value => console.log(`Observer 2: ${value}`));
}, 1500);
// Connect to the source
const subscription = published$.connect();
// Later, unsubscribe to clean up
setTimeout(() => {
subscription.unsubscribe();
console.log('Unsubscribed from source');
}, 5000);
// Output:
// Source emitted: 0
// Observer 1: 0
// Source emitted: 1
// Observer 1: 1
// Observer 2: 1
// Source emitted: 2
// Observer 1: 2
// Observer 2: 2
// Unsubscribed from source
When to use:
multicast
but still want manual control over connectionPurpose: Shares a source Observable among multiple subscribers automatically, without requiring manual connection.
import { interval } from 'rxjs';
import { share, tap, take } from 'rxjs/operators';
// Create a source Observable with a side effect
const source$ = interval(1000).pipe(
take(5),
tap(value => console.log(`Source emitted: ${value}`)),
share() // Automatically share the subscription
);
// First subscription
console.log('Observer 1 subscribes');
const subscription1 = source$.subscribe(value => console.log(`Observer 1: ${value}`));
// Second subscription after a delay
setTimeout(() => {
console.log('Observer 2 subscribes');
const subscription2 = source$.subscribe(value => console.log(`Observer 2: ${value}`));
// Unsubscribe the second observer after some time
setTimeout(() => {
console.log('Observer 2 unsubscribes');
subscription2.unsubscribe();
}, 2000);
}, 2000);
// Unsubscribe the first observer after some time
setTimeout(() => {
console.log('Observer 1 unsubscribes');
subscription1.unsubscribe();
}, 4000);
// Output:
// Observer 1 subscribes
// Source emitted: 0
// Observer 1: 0
// Source emitted: 1
// Observer 1: 1
// Observer 2 subscribes
// Observer 2: 1
// Source emitted: 2
// Observer 1: 2
// Observer 2: 2
// Observer 2 unsubscribes
// Source emitted: 3
// Observer 1: 3
// Observer 1 unsubscribes
// (Source stops emitting as all subscribers are gone)
When to use:
Purpose: Shares a source Observable and replays a specified number of emissions to new subscribers.
import { interval } from 'rxjs';
import { shareReplay, take, tap } from 'rxjs/operators';
// Create a source Observable with a side effect
const source$ = interval(1000).pipe(
take(5),
tap(value => console.log(`Source emitted: ${value}`)),
shareReplay(2) // Share and replay the last 2 values
);
// First subscription
console.log('Observer 1 subscribes');
source$.subscribe(value => console.log(`Observer 1: ${value}`));
// Second subscription after some emissions have occurred
setTimeout(() => {
console.log('Observer 2 subscribes (late)');
source$.subscribe(value => console.log(`Observer 2: ${value}`));
}, 3500);
// Output:
// Observer 1 subscribes
// Source emitted: 0
// Observer 1: 0
// Source emitted: 1
// Observer 1: 1
// Source emitted: 2
// Observer 1: 2
// Source emitted: 3
// Observer 1: 3
// Observer 2 subscribes (late)
// Observer 2: 2 (replayed)
// Observer 2: 3 (replayed)
// Source emitted: 4
// Observer 1: 4
// Observer 2: 4
When to use:
Operator | Auto-Connect | Replay Values | Reference Counting | Use Case |
---|---|---|---|---|
multicast |
No | Depends on Subject | No | Fine-grained control with custom Subjects |
publish |
No | No | No | Manual connection with standard Subject |
share |
Yes | No | Yes | Most common sharing scenarios |
shareReplay |
Yes | Yes | Yes (configurable) | Caching and late subscriber scenarios |
import { Observable, of } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { shareReplay, catchError, map } from 'rxjs/operators';
// Service class for fetching user data
class UserService {
private userCache$: Observable<any>;
constructor() {
// Initialize the cache
this.userCache$ = null;
}
getUsers() {
// If we already have a cached Observable, return it
if (this.userCache$) {
console.log('Returning cached users data');
return this.userCache$;
}
// Otherwise, create a new request and cache it
console.log('Fetching users from API');
this.userCache$ = ajax.getJSON('https://jsonplaceholder.typicode.com/users').pipe(
map(response => response),
catchError(error => {
console.error('Error fetching users:', error);
return of([]);
}),
// Cache the result indefinitely (or until service is reset)
shareReplay(1)
);
return this.userCache$;
}
// Method to clear the cache if needed
clearCache() {
this.userCache$ = null;
console.log('Cache cleared');
}
}
// Usage example
const userService = new UserService();
// First request - will fetch from API
userService.getUsers().subscribe(users => {
console.log('First component received users:', users.length);
});
// Second request - should use cached data
setTimeout(() => {
userService.getUsers().subscribe(users => {
console.log('Second component received users:', users.length);
});
}, 2000);
// Third request after cache clear - will fetch from API again
setTimeout(() => {
userService.clearCache();
userService.getUsers().subscribe(users => {
console.log('Third component received users after cache clear:', users.length);
});
}, 4000);
import { Subject } from 'rxjs';
import { share, filter, map } from 'rxjs/operators';
// Simple event bus implementation
class EventBus {
private events$ = new Subject<{ type: string, payload: any }>();
// Shared stream of events
public sharedEvents$ = this.events$.pipe(
share()
);
// Dispatch an event
dispatch(type: string, payload: any) {
this.events$.next({ type, payload });
}
// Listen for specific event types
on(eventType: string) {
return this.sharedEvents$.pipe(
filter(event => event.type === eventType),
map(event => event.payload)
);
}
}
// Usage example
const eventBus = new EventBus();
// Component 1 listens for 'USER_UPDATED' events
const userSubscription = eventBus.on('USER_UPDATED').subscribe(userData => {
console.log('User component received update:', userData);
});
// Component 2 listens for 'NOTIFICATION' events
const notificationSubscription = eventBus.on('NOTIFICATION').subscribe(notification => {
console.log('Notification received:', notification);
});
// Dispatch some events
eventBus.dispatch('USER_UPDATED', { id: 1, name: 'John Doe', status: 'active' });
eventBus.dispatch('NOTIFICATION', { message: 'New message received', type: 'info' });
eventBus.dispatch('USER_UPDATED', { id: 1, name: 'John Doe', status: 'away' });
// Clean up
setTimeout(() => {
userSubscription.unsubscribe();
notificationSubscription.unsubscribe();
console.log('Subscriptions cleaned up');
}, 5000);
Multicasting operators are essential for optimizing Observable streams in RxJS applications. They help you avoid duplicate side effects, reduce resource consumption, and maintain consistency across your application:
multicast
when you need fine-grained control with custom Subjectspublish
for manual connection with a standard Subjectshare
⭐ for most common sharing scenarios with automatic reference countingshareReplay
⭐ for caching and ensuring late subscribers receive important valuesBy choosing the right multicasting operator for your specific use case, you can significantly improve the performance and behavior of your reactive applications.