Quick Contact

    Angular 8 Observables

    Observables provide support for passing messages between publisher and subscription in our application. The observables can deliver the multiple values of any type like literal, messages, depending on the content.

    It is an object that defines the callback method to handle the three types of notification that an observable can send. These are the following.

    • Next: Required.

      The handler for each delivered value called zero or more type after execution start.

    • Error: Optional.

      The handler is used for error warning. An error stops the execution of the observable instance.

    • Complete: Optional.

      The execution for the execution complete notification. Delayed value can continue delivering to the next handler after the execution complete.

    An observable deliver multiple values of any type-literals, messages, or events, depending on the context. The API for receiving values is the same whether the values are offered on the context. Because setup and teardown logic are handled by the observables, And our application code only needs to subscribing to consume values, and when it done, or unsubscribing.

    Basic usage and terms

    As a publisher, we create an observer instance that defines a subscriber function. It is the function that is executed when a consumer calls the subscribe() method.

    To execute the observable we have created and begun receiving notifications, we call the subscribe() method, passing an observer. This is a JavaScript object which defines the handlers for the notifications we receive. The subscribe () call returns a subscription object than the unsubscribe () method, what we call to stop receiving notifications.

    Example
    
    	Observe geolocation updates
    	 // Create an Observable which will start listening to gelolocation updates
    	 // when a consumer subscribed.
    	const locations = new Observable((observer) => {
    	   // Get the next callbacks. It will be passed in when 
    	const {next, error} = observer;
    	   letwatchId; 
    	if ('geolocation' in navigator) {
    	     watchId = navigator.geolocation.watchPosition(next, error);
    	   } else {
    	     error('Geolocation not available');
    	   }
    	return {unsubscribe() { navigator.geolocation.clearWatch(watchId); }};
    	 }); 
    	constlocationsSubscription = locations.subscribe({
    	next(position) {console.log('Current Position: ', position); },
    	error(msg) { console.log('Error Getting Location: ', msg); }
    	 });
    	setTimeout(() => { locationsSubscription.unsubscribe(); }, 10000);
    	
    Creating observable

    Use the observable constructor to create any observable stream of any type of method. The constructor makes an its argument the subscriber function to run when the observable’s subscribe( ) method executes. A subscriber function receiver an observer object, and can publish values to the observer’s next() method.

    For Example
    Create observable with constructor
    
    	// The function runs when subscribe() is called in the function.
    	functionsequenceSubscriber(observer) {
    	  //It synchronously deliver 1, 2, and 3, then complete 
    	   observer.next(1);
    	   observer.next(2);
    	   observer.next(3);
    	   observer.complete();
    	  return {unsubscribe() {}}; 
    	 }
    	const sequence = new Observable(sequenceSubscriber);
    	sequence.subscribe({
    	   next(num) { console.log(num); },
    	   complete() { console.log('Finished sequence'); }
    	 }); 
    	 // Logs:
    	 // 1..
    	 // 2..
    	 // 3..
    	 // Finished sequence
    	
    To take this example a little further, we can create an observable which publishes events.
    Example:
    Create with custom fromEvent function
    
    	functionfromEvent(target, eventName) {
    	   return new Observable((observer) => {
    	     const handler = (e) =>observer.next(e);
    	     // Add the event handler to the target
    	     target.addEventListener(eventName, handler);
    	return () => {
    	       // Detach the event handler from the target
    	       target.removeEventListener(eventName, handler); 
    	     };
    	   });
    	 }
    	

    Now we can use function to create an observable that publishes keydown events:

    Use custom fromEvent function
    
    	const ESC_KEY = 27;
    	constnameInput = document.getElementById('name') as HTMLInputElement;
    	const subscription = fromEvent(nameInput, 'keydown')
    	   .subscribe((e: KeyboardEvent) => {
    	     if (e.keyCode === ESC_KEY){ 
    	   nameInput.value = '';
    	   } 
    	   });
    	
    Multicasting

    An Observable creates a new, independent execution for the subscribed observer. When an observer subscribes, the observable wire up an event handler and delivers the value to the observer. When a second observer subscribes, the observables then it wires up a new event handler and delivers values to the second observer in a separate execution.

    Sometimes, In place of starting an independent execution for every subscriber, we want each subscription to get the same values-even if values have already begun to emit.

    Let’s see an example that counts from 1 to 3, with a one-second delay after each number emitted.
    Create a delayed sequence
    
    	functionsequenceSubscriber(observer) {
    	  constseq = [1, 2, 3];
    	   lettimeoutId;
    	   functiondoSequence(arr, idx) {
    	     timeoutId = setTimeout(() => {
    	       observer.next(arr[idx]);
    	       if (idx === arr.length - 1) { 
    	         observer.complete();
    	       } else {
    	         doSequence(arr, ++idx);
    	       }
    	     }, 1000);
    	   }
    	doSequence(seq, 0); 
    	   return {unsubscribe() {
    	     clearTimeout(timeoutId);
    	   }};
    	 }
    	 //It Create a new Observable that will deliver the above sequence 
    	const sequence = new Observable(sequenceSubscriber);
    	sequence.subscribe({
    	   next(num) { console.log(num); },
    	   complete() { console.log('Finished sequence'); }
    	 });
    	 // Logs:
    	 // (at 1 second): 1 
    	 // (at 2 seconds): 2
    	 // (at 3 seconds): 3
    	 // (at 3 seconds): Finished sequence
    	

    Notice that if we subscribe twice, there will be two separate streams, each emitting values every second. It looks something like this:

    Two subsciptions
    
    	// Subscribe starts the clock, and emit after per second 
    	sequence.subscribe({
    	next(num) { console.log('1st subscribe: ' + num); },
    	complete() { console.log('1st sequence finished.'); }
    	 });
    	 // After 1/2 second, subscribe again.
    	setTimeout(() => {
    	sequence.subscribe({
    	next(num) { console.log('2nd subscribe: '+ num); },
    	complete() { console.log('2nd sequence finished.'); }
    	   });
    	 }, 700)
    	 // Log:
    	 // (In 1 second): 1st subscribe: 1
    	 // (In 1.5 seconds): 2nd subscribe: 1 
    	 // (In 2 seconds): 1st subscribe: 2
    	 // (In 2.5 seconds): 2nd subscribe: 2
    	 // (In 3 seconds): 1st subscribe: 3
    	 // (In 3 seconds): 1st sequence finished
    	 // (In 3.5 seconds): 2nd subscribe: 3
    	 // (In 3.5 seconds): 2nd sequence finished
    	
    Changing the observable to be multicasting look something like this:
    Create a multicast Subscriber
    
    	functionmulticastSequenceSubscriber(){
    	   constseq = [1, 2, 3];
    	const observers = [];
    	   lettimeoutId;
    	  (runs when subscribe()
    	   // function is invoked)
    	   return (observer) => {
    	     observers.push(observer); 
    	     if (observers.length === 1) {
    	       timeoutId = doSequence({
    	         next(val) {
    	           observers.forEach(obs =>obs.next(val));
    	         },
    	         complete() {
    	           observers.slice(0).forEach(obs =>obs.complete()); 
    	         }
    	       }, seq, 0);
    	     }
    	  return {
    	       unsubscribe() {
    	         observers.splice(observers.indexOf(observer), 1);
    	         if (observers.length === 0) { 
    	           clearTimeout(timeoutId);
    	         }
    	       }
    	     };
    	   };
    	 } 
    	functiondoSequence(observer, arr, idx) {
    	   returnsetTimeout(() => {
    	     observer.next(arr[idx]);
    	     if (idx === arr.length - 1) {
    	       observer.complete();
    	     } else {
    	       doSequence(observer, arr, ++idx); 
    	     }
    	   }, 1000);
    	 }
    	constmulticastSequence = new Observable(multicastSequenceSubscriber());
    	 // Subscribe starts the clock, and begins to emit after 1 second
    	multicastSequence.subscribe({
    	   next(num) { console.log('1st subscribe: ' + num); },
    	   complete() { console.log('1st sequence finished.'); }
    	 });
    	 // After 1 1/2 seconds, subscribe again (should "miss" the first value).
    	setTimeout(() => {
    	   multicastSequence.subscribe({
    	     next(num) { console.log('2nd subscribe: ' + num); },
    	     complete() { console.log('2nd sequence finished.'); } 
    	   });
    	 }, 1500);
    	 // Logs:
    	 // (at 1 second): 1st subscribe: 1
    	 // (at 2 seconds): 1st subscribe: 2
    	 // (at 2 seconds): 2nd subscribe: 2
    	 // (at 3 seconds): 1st subscribe: 3
    	 // (at 3 seconds): 1st sequence finished 
    	 // (at 3 seconds): 2nd subscribe: 3
    	 // (at 3 seconds): 2nd sequence finished
    	
    Error handling

    Because observables produce values asynchronously, try/catch will not effectively catch errors. Instead, we handle errors by specifying an error callback on the observer. An observable can produce values (calling the next callback), or it can complete, calling either the complete or error also causes the observable to clean up subscriptions and stop producing values.

    
    	myObservable.subscribe({
    	   next(num) { console.log('Next num: ' + num)},
    	   error(err) { console.log('Received an errror: ' + err)}
    	 });
    	

    Copyright 1999- Ducat Creative, All rights reserved.