Package

monifu

reactive

Permalink

package reactive

Visibility
  1. Public
  2. All

Type Members

  1. sealed trait Ack extends Future[Ack]

    Permalink

    Represents the acknowledgement of processing that a consumer sends back upstream on Observer.onNext

  2. trait Channel[-I] extends AnyRef

    Permalink

    A channel is meant for imperative style feeding of events.

    A channel is meant for imperative style feeding of events.

    When emitting events, one doesn't need to follow the back-pressure contract. On the other hand the grammar must still be respected:

    (pushNext)* (pushComplete | pushError)

  3. trait Notification[+T] extends AnyRef

    Permalink

    Used by Observable.materialize.

  4. trait Observable[+T] extends AnyRef

    Permalink

    The Observable interface in the Rx pattern.

    The Observable interface in the Rx pattern.

    Interface

    An Observable is characterized by a onSubscribe method that needs to be implemented. In simple terms, an Observable might as well be just a function like:

    type Observable[+T] = Subscriber[T] => Unit

    In other words an Observable is something that provides a side-effecting function that can connect a Subscriber to a stream of data. A Subscriber is a cross between an Observer and a Scheduler. We need a Scheduler when calling subscribe because that's when the side-effects happen and a context capable of scheduling tasks for asynchronous execution is needed. An Observer on the other hand is the interface implemented by consumer and that receives events according to the Rx grammar.

    On onSubscribe, because we need the interesting operators and the polymorphic behavior provided by OOP, the Observable is being described as an interface that has to be implemented:

    class MySampleObservable(unit: Int) extends Observable[Int] {
      def onSubscribe(sub: Subscriber[Int]): Unit = {
        implicit val s = sub.scheduler
        // note we must apply back-pressure
        // when calling `onNext`
        sub.onNext(unit).onComplete {
          case Success(Cancel) =>
            () // do nothing
          case Success(Continue) =>
            sub.onComplete()
          case Failure(ex) =>
            sub.onError(ex)
        }
      }
    }

    Of course, you don't need to inherit from this trait, as you can just use Observable.create, the following example being equivalent to the above:

    Observable.create[Int] { sub =>
      implicit val s = sub.scheduler
      // note we must apply back-pressure
      // when calling `onNext`
      sub.onNext(unit).onComplete {
        case Success(Cancel) =>
          () // do nothing
        case Success(Continue) =>
          sub.onComplete()
        case Failure(ex) =>
          sub.onError(ex)
      }
    }

    The above is describing how to create your own Observables, however Monifu already provides already made utilities in the Observable companion object. For example, to periodically make a request to a web service, you could do it like this:

    // just some http client
    import play.api.libs.ws._
    
    // triggers an auto-incremented number every second
    Observable.intervalAtFixedRate(1.second)
      .flatMap(_ => WS.request(s"http://some.endpoint.com/request").get())

    As you might notice, in the above example we are doing Observable!.flatMap on an Observable that emits Future instances. And it works, because Monifu considers Scala's Futures to be just a subset of Observables, see the automatic FutureIsObservable conversion that it defines. Or you could just use Observable.fromFuture for explicit conversions, an Observable builder available amongst others.

    Contract

    Observables must obey Monifu's contract, this is why if you get away with already built and tested observables, that would be better than implementing your own by means of inheriting the interface or by using create. The contract is this:

    • the supplied onSubscribe method MUST NOT throw exceptions, any unforeseen errors that happen in user-code must be emitted to the observers and the streaming closed
    • events MUST follow this grammar: onNext* (onComplete | onError)
      • a data source can emit zero or many onNext events
      • the stream can be infinite, but when the stream is closed (and not canceled by the observer), then it always emits a final onComplete or onError
    • MUST apply back-pressure when emitting events, which means that sending events is always done in response to demand signaled by observers and that observers should only receive events in response to that signaled demand
      • emitting a new onNext event must happen only after the previous onNext completed with a Continue
      • streaming must stop immediately after an onNext event is signaling a Cancel
      • back-pressure must be applied for the final events as well, so onComplete and onError must happen only after the previous onNext was completed with a Continue acknowledgement
      • the first onNext event can be sent directly, since there are no previous events
      • if there are no previous onNext events, then streams can be closed with onComplete and onError directly
      • if buffering of events happens, it is acceptable for events to get dropped when onError happens such that its delivery is prioritized
    On Dealing with the contract

    Back-pressure means in essence that the speed with which the data-source produces events is adjusted to the speed with which the consumer consumes.

    For example, lets say we want to feed an iterator into an observer, similar to what we are doing in Observer.feed, we might build a loop like this:

    /** Transforms any Iterable into an Observable */
    def fromIterator[T](iterable: Iterable[T]): Observable[T] =
      Observable.create { sub =>
        implicit val s = sub.scheduler
        loop(sub.observer, iterable.iterator).onComplete {
          case Success(Cancel) =>
            () // do nothing
          case Success(Continue) =>
            sub.onComplete()
          case Failed(ex) =>
            reportError(sub.observer, ex)
        }
      }
    
    private def loop[T](o: Observer[T], iterator: Iterator[T])
      (implicit s: Scheduler): Future[Ack] = {
    
      try {
        if (iterator.hasNext) {
          val next = iterator.next()
          // signaling event, applying back-pressure
          o.onNext(next).flatMap {
            case Cancel => Cancel
            case Continue =>
              // signal next event (recursive, but async)
              loop(o, iterator)
          }
        }
        else {
          // nothing left to do, and because we are implementing
          // Observer.feed, the final acknowledgement is a `Continue`
          // assuming that the observer hasn't canceled or failed
          Continue
        }
      }
      catch {
        case NonFatal(ex) =>
          reportError(o, ex)
      }
    }
    
    private def reportError[T](o: Observer[T], ex: Throwable): Cancel =
      try o.onError(ex) catch {
        case NonFatal(err) =>
          // oops, onError failed, trying to
          // report it somewhere
          s.reportFailure(ex)
          s.reportFailure(err)
          Cancel
      }

    There are cases in which the data-source can't be slowed down in response to the demand signaled through back-pressure. For such cases buffering is needed.

    For example to "imperatively" build an Observable, we could use channels:

    val channel = PublishChannel[Int](OverflowStrategy.DropNew(bufferSize = 100))
    
    // look mum, no back-pressure concerns
    channel.pushNext(1)
    channel.pushNext(2)
    channel.pushNext(3)
    channel.pushComplete()

    In Monifu a Channel is much like a Subject, meaning that it can be used to construct observables, except that a Channel has a buffer attached and IS NOT an Observer (like the Subject is). In Monifu (compared to Rx implementations) Subjects are subject to back-pressure concerns as well, so they can't be used in an imperative way, like described above, hence the need for Channels.

    Or for more serious and lower level jobs, you can simply take an Observer and wrap it into a BufferedSubscriber.

    See also

    Channel, which are meant for imperatively building Observables without back-pressure concerns

    Subject, which are both Observables and Observers

    Cancelable, the type returned by higher level subscribe variants and that can be used to cancel subscriptions

    Subscriber, the cross between an Observer and a Scheduler

    Scheduler, our enhanced ExecutionContext

    Observer, the interface that must be implemented by consumers

  5. trait Observer[-T] extends AnyRef

    Permalink

    The Observer from the Rx pattern is the trio of callbacks that get subscribed to an Observable for receiving events.

    The Observer from the Rx pattern is the trio of callbacks that get subscribed to an Observable for receiving events.

    The events received must follow the Rx grammar, which is: onNext * (onComplete | onError)?

    That means an Observer can receive zero or multiple events, the stream ending either in one or zero onComplete or onError (just one, not both), and after onComplete or onError, a well behaved Observable implementation shouldn't send any more onNext events.

  6. sealed trait OverflowStrategy extends AnyRef

    Permalink

    Represents the buffering overflowStrategy chosen for actions that need buffering, instructing the pipeline what to do when the buffer is full.

    Represents the buffering overflowStrategy chosen for actions that need buffering, instructing the pipeline what to do when the buffer is full.

    For the available policies, see:

    - Unbounded - OverflowTriggering - BackPressured

    Used in BufferedSubscriber to implement buffering when concurrent actions are needed, such as in Channels or in Observable.merge.

  7. trait Subject[I, +T] extends Observable[T] with Observer[I] with LiftOperators2[I, T, Subject]

    Permalink

    A Subject is a sort of bridge or proxy that acts both as an Observer and as an Observable and that must respect the contract of both.

    A Subject is a sort of bridge or proxy that acts both as an Observer and as an Observable and that must respect the contract of both.

    Because it is a Observer, it can subscribe to an Observable and because it is an Observable, it can pass through the items it observes by re-emitting them and it can also emit new items.

    Useful to build multicast Observables or reusable processing pipelines.

  8. trait Subscriber[-T] extends Observer[T]

    Permalink

    A Subscriber value is a named tuple of an observer and a scheduler, whose usage is in Observable.create.

    A Subscriber value is a named tuple of an observer and a scheduler, whose usage is in Observable.create.

    An Observable.subscribe takes as parameters both an Observer and a Scheduler and the purpose of a Subscriber is convenient grouping in Observable.create.

    A Subscriber value is basically an address that the data source needs in order to send events.

Value Members

  1. object Ack

    Permalink
  2. object Notification

    Permalink
  3. object Observable

    Permalink
  4. object Observer

    Permalink
  5. object OverflowStrategy

    Permalink
  6. object Subject

    Permalink
  7. object Subscriber

    Permalink
  8. package channels

    Permalink
  9. package exceptions

    Permalink
  10. package observables

    Permalink
  11. package observers

    Permalink
  12. package streams

    Permalink
  13. package subjects

    Permalink

Ungrouped