Home

Awesome

PureScript-Queue

Build Status Pursuit

The pub/sub model captures a simple concept - assign "handlers" to an event manager of some kind, and delegate messages to those handlers by issuing events. This is also considered a "channel" in Haskell - something that stores messages until they are read.

The underlying implementations are pretty simple, because JavaScript is single-threaded. As a result, we have the following architecture:

import Queue (Queue, readOnly, writeOnly, READ, WRITE, new, on, put)


main :: Effect Unit
main = do
  (q :: Queue (read :: READ, write :: WRITE) Int) <- new
  
  -- assign a handler
  on (readOnly q) logShow
  
  -- put messages to the queue
  put (writeOnly q) 1
  put q 2 -- Doesn't need to be strictly write-only

The calls to readOnly and writeOnly aren't necessary; they're just to demonstrate the ability to quarantine sections of your code, in which you only which to expose (read :: READ) or (write :: WRITE) access, as the only facilities available to the queue.

The primary purpose of a queue is to decouple function invocation from parameter application - if I don't have a function yet, but I have inputs for it, then I'll just write them to the queue. If I have a function, but no inputs yet, then I'll just add the handler to the queue to await inputs. Furthermore, if you want to remove a handler, you should be able to.

It tries to imitate similar functionality to Chans from Haskell, but isn't nearly as cool; the IO monad in Haskell can block indefinitely while other threads write to the same channel, while our Queues can only do that in the Aff monad (single-threaded, but asynchronous). This functionality is achieved using the Queue.Aff module and its siblings.

There are three flavors of Queue, sorted by their module:

Verbiage

Some less important functions:

Read and Write Scope

Initially, when using new, a queue is both read and write accessible, through the type-level flags READ and WRITE. This may be undesirable for complex networks of queues, where one section of code clearly only supplies data, while another one clearly only consumes it. There are functions for changing this:

This makes the type signature for a queue look something like Queue (read :: READ, write :: WRITE) Foo.

Extra

There is also some extra kit defined in module Queue.Types - debounceStatic, throttleStatic, and intersperseStatic. These functions take similar arguments - some time value, and a readable queue, and return a writable queue, with some thread (fiber).

Generally, in your code, you will be listening to the queue you provide, while writing to the queue these functions return, to get their intended effects.

Note: these variants are called "fooStatic" because there's only a single time value used, and wouldn't be capable of something more advanced like exponential falloff.

Asynchronous message plumbing

This library's additional goal was to aid in asynchronous interop; having some source data originate asynchronously, and the ability to handle it spontaneously. Through the IOQueues and IxQueue.IOQueues modules, we can treat message passing to queues at a higher level, as procedure invocations. We do this by creating two queues - one for handling inputs to the handler(s), and one for returning results from the handler(s).

In the module IOQueues and module IxQueue.IOQueues modules, there's some somewhat confusing nomenclature:

import Queue.One (Queue) as One
-- ^ most lightweight implementation, only one handler
import IOQueues (new, registerSync, callAsync, IOQueues)
import Effect.Aff (runAff_)

main = do
  (io :: IOQueues One.Queue Int Int) <- newIOQueues
  -- "IOQueues queue input output" means "using 'queue', take 'input' and make 'output'."
  
  let handler :: Int -> Effect Int
      handler i = do
        log $ "input: " <> show i
        let o = i + 1
        log $ "incremented: " <> show o
        pure o
  registerSync io handler -- attach the handler in Effect
    
  -- `resolveAff` does nothing - it's needed by `runAff_` - see `Effect.Aff` for details
  let resolveAff :: Either String Unit -> Effect Unit
      resolveAff _ = pure unit

  runAff_ resolveAff do
    result <- callAsync io 20 -- invoke delegated computation in Aff
    liftEffect $ log $ "Should be 21 - Result: " <> show result

Multiple Handlers

module IxQueue.IOQueues is useful when you need async IOQueues calls, but need to integrate into an existing IxQueue with a network of handlers and data supplied. The primary difference with this is that through IxQueue.IOQueues, you need to pass a String reference to identify which handlers will be targeted by what data.

Using a module Queue as an underlying IOQueues queue might be pretty confusing - it allows for multiple registerSync handlers waiting for the same input, which would cause a race condition for the callAsync invocation. However, if you mess with the internal output queue in the IOQueues and add extra handlers, you may broadcast the results of the registerSync handler to multiple areas, without race conditions (callAsync only reads from the output queue once). Either way, this use case would probably not be stable or in your favor, and should generally be avoided.

Contributing

This library has grown quite a lot over the years, and I still find it very useful. purescript-react-queue was spawned from this, and I still use it all the time for managing react components.

If you have any ideas you'd like to see in this library, please file an issue or drop me a line. Thanks for using it!