Home

Awesome

GraphQL-RxJs

fork of graphql-js which adds AsyncIterator & Observable support (RxJs).

npm version Build Status

Intro

This package adds Reactivity for GraphQLResolver, Which means you can now return:

This package also adds reactive directives support:

The package is pretty small because it is importing the original graphql-js package, and then patches it to provide a reactive execution engine over it.

Examples

Documentation

There isn't much to document, all of GraphQL documentation is relevant. See the complete documentation at http://graphql.org/ and http://graphql.org/graphql-js/.

Versioning

I'll try to follow along with graphql-js versions, so basiclly, each graphql-js version should have a working graphql-rxjs package with it.

API

The library exports the following functions:

AsyncIterator support

export function graphqlReactive(
  schema: GraphQLSchema,
  requestString: string,
  rootValue?: any,
  contextValue?: any,
  variableValues?: {[key: string]: any},
  operationName?: string
  fieldResolver?: GraphQLFieldResolver<any, any>,
): AsyncIterator<ExecutionResult>;

export function executeReactive(
  schema: GraphQLSchema,
  document: DocumentNode,
  rootValue?: any,
  contextValue?: any,
  variableValues?: {[key: string]: any},
  operationName?: string
  fieldResolver?: GraphQLFieldResolver<any, any>,
): AsyncIterator<ExecutionResult>;

The signature is equal to GraphQL original implementation (graphql + execute), except it returns an asyncIterator instead of a promise. The asyncIterator will stream immutable results.

Observable support

export function graphqlRx(
  schema: GraphQLSchema,
  requestString: string,
  rootValue?: any,
  contextValue?: any,
  variableValues?: {[key: string]: any},
  operationName?: string
  fieldResolver?: GraphQLFieldResolver<any, any>,
): Observable<ExecutionResult>;

export function executeRx(
  schema: GraphQLSchema,
  document: DocumentNode,
  rootValue?: any,
  contextValue?: any,
  variableValues?: {[key: string]: any},
  operationName?: string
  fieldResolver?: GraphQLFieldResolver<any, any>,
): Observable<ExecutionResult>;

export function subscribeRx(
  schema: GraphQLSchema,
  document: DocumentNode,
  rootValue?: any,
  contextValue?: any,
  variableValues?: {[key: string]: any},
  operationName?: string,
  fieldResolver?: GraphQLFieldResolver<any, any>,
  subscribeFieldResolver?: GraphQLFieldResolver<any, any>
): Observable<ExecutionResult>;

The signature is equal to GraphQL original implementation (graphql + execute + subscribe), except it returns an observable instead of a promise. The observable will stream immutable results.

Preparing schema for GraphQL-RxJs

export function prepareSchema(
  schema: GraphQLSchema,
): prepareSchema;

This function is used to prepare schema for graphql-rxjs. wrapping resolvers, adding reactive directives support, etc.. At the moment, it will be automatically invoked when running GraphQL-RxJS.

NOTE: if you are using original graphql's validate, you will have to trigger prepareSchema manually and not count on auto-trigger.

if you don't want to call it directly, you can use the inner APIs seperately:

Reactive Directives
export function addReactiveDirectivesToSchema(
  schema: GraphQLSchema,
): void;

Calling this function on your existing GraphQLSchema object will enable reactive directives suppot for the schema. More information about reactive directives can be found below.

Observable support in resolvers
export function wrapResolvers(
  schema: GraphQLSchema,
): void;

Calling this function on your existing GraphQLSchema object will enable reactive directives suppot for the schema. More information about reactive directives can be found below.

Getting Started:

  1. The Data source

Let's start off by explaining our Observable, or Data-source that we are going to stream.

const clockSource = Observable.interval(1000).map(() => new Date()).publishReplay(1).refCount();

We can see that it is an Observable that emits i+1 every second:

let source = Observable.interval(1000);

Next we are going to map the value to a the current timestep:

source = source.map(() => new Date());

Finally, We are going to convert the observable into an hot observable, or multicast.

This means that there is a ref count on the observable, only once the first subscriber subscribe, the provider function will be triggered, and from then on, each subscriber will get the last value (without triggering the provider function) and then, both subscribers will get next values.

Once a subscriber unsubscribe, the refCount will decrease, until the counter reachs zero, and only then the unsubscribe function will be triggered.

const clockSource = source.publishReplay(1).refCount();

This approach will let us manage resources much more efficiently.

  1. The Scheme

Next, let's look at our scheme

# Root Query
type Query {
  someInt: Int
}

# Root Subscription
type Subscription {
  clock: String
}

Query type exposes someInt, just because it cannot be empty, let's ignore that.

Subscription type expoes clock which is basiclly periodic timestamp strings.

  1. Wrapping them togather

So, here is a basic code to demonstrate the concept:

import { Observable } from 'rxjs';
import { makeExecutableSchema } from 'graphql-tools';
import { prepareSchema, graphqlRx } from 'graphql-rxjs';

const clockSource = Observable.interval(1000).map(() => new Date()).publishReplay(1).refCount();

const typeDefs = `
# Root Query
type Query {
  someInt: Int
}

# Root Subscription
type Subscription {
  clock: String
}
`;

const resolvers = {
    Subscription: {
        clock(root, args, ctx) {
              return ctx.clockSource;
        },
    },
};

// Compose togather resolver and typeDefs.
const scheme = makeExecutableSchema({typeDefs: typeDefs, resolvers: resolvers});
prepareSchema(schema);

// subscribe the clock
const query = `
  subscription {
	clock
  }
`

// Calling the reactive version of graphql
graphqlRx(scheme, query, null, { clockSource })
.subscribe(console.log.bind(console), console.error.bind(console));

The following response will emit in console:

{"data":{"clock":"Fri Feb 02 2017 20:28:01 GMT+0200 (IST)"}}
{"data":{"clock":"Fri Feb 02 2017 20:28:02 GMT+0200 (IST)"}}
{"data":{"clock":"Fri Feb 02 2017 20:28:03 GMT+0200 (IST)"}}
...

Reactive Directives

This library also implements reactive directives, those are supported at the moment:

  1. GraphQLDeferDirective (@defer)
  1. GraphQLLiveDirective (@live)

Typescript support

Just install @types/graphql for initial GraphQL support, then the package will automatically add typings for the new functions it provides.

Benchmarks

      * graphqlOri x 7,496 ops/sec ±9.20% (78 runs sampled)
      * graphqlRx x 6,790 ops/sec ±3.48% (75 runs sampled)
      => Fastest is graphqlOri
    ✓ compare performance for simple query (11088ms)

      * graphqlOri x 3,040 ops/sec ±9.81% (70 runs sampled)
      * graphqlRx x 1,303 ops/sec ±4.19% (74 runs sampled)
      => Fastest is graphqlOri
    ✓ compare performance for deep query (17515ms)
    
      * graphqlOri x 5,360 ops/sec ±20.55% (12 runs sampled)
      * graphqlRx x 3,067 ops/sec ±32.41% (10 runs sampled)
      => Fastest is graphqlOri
    ✓ compare performance for serial mutation (27004ms)

as results shows above, the reactive engine is abit slower then the original one, however that was expected, i have yet to put the effort on optimizations, and i am planing to optimize on the future.

Issues

If you found an issue or have an idea, you are welcome to open a new ticket in Issues Page

Support

Using this approach, feels much more intuative then the other approaches to stream results so far. Because of that I tried to push it into upstream graphql-js but got rejected, if you want to support the project you can follow/thumbs up the following:

  1. Issue on graphql-js
  2. PR on graphql-js
  3. Draft design for apollo subscriptions

Contributing

All pull requests are welcome, if you think something needs to be done, just open an issue or a PR :)