angel_task 1.1.1

  • README.md
  • Installing
  • Versions
  • 45

task

Pub build status

Support for running and scheduling asynchronous tasks within Angel. Parameters can be injected into scheduled tasks with the same dependency injection system used by Angel.

Installation

In your pubspec.yaml file:

  dependencies:
    angel_framework: ^1.0.0
    angel_task: ^1.0.0

Usage

main() async {
  var app = await createServer();
  var scheduler = new AngelTaskScheduler(app);

  // Run a one-off task, with an optional delay.
  scheduler.once((Todo singleton) {
    print('3 seconds later, we found our Todo singleton: "${singleton.text}"');
  }, new Duration(seconds: 3));

  Task foo;
  int i = 0;

  // Periodically run functionality
  foo = scheduler.seconds(1, () {
    print('Printing ${++i} time(s)!');

    if (i >= 3) {
      print('Cancelling foo task...');
      foo.cancel();
    }
  });
  
  // Named tasks!
  var greetTask = scheduler.minutes(3, (String message) => print(message), name: 'greet');
  
  // You can still access services, etc., and thus manipulate databases from tasks.
  scheduler.once(() {
    app.service('foo').create({'foo': 'bar'});
  });

  // If you never start the scheduler, no tasks will ever run.
  await scheduler.start();
  
  // Run a named task
  var result = await scheduler.run('greet', ['Hello, world!']);
}

Make sure to start the scheduler. Otherwise, your tasks will never run:

main() async {
  // ...
  await scheduler.start();
}

You can also listen to a Stream of a task's results:

main() async {
  // ...
  var task = scheduler.minutes(3, fibonacci, args: [13]);
  var fib13 = await task.results.first;
}

Multithreading

Angel's task engine also supports communication over isolates, which allows you to run all tasks in a separate thread from the server, without losing performance.

For example, if your processor has 4 cores, you might spawn 4 total isolates:

  • 3 child isolates, all running the server
  • 1 master isolate, which runs the task engine

The master isolate can be dedicated to just running tasks, with no HTTP responsibility. The child isolates only have to worry about serving your application to the Web.

The AngelTaskScheduler has a receivePort that it uses to communicate with clients.

To access the scheduler as a client, instantiate a AngelTaskClient. The constructor accepts a single SendPort, which in this case comes from the scheduler.

IMPORTANT: If you want to send the return value of task functions to clients, then set sendReturnValues to true in the AngelTaskScheduler constructor. If so, then the results of your task callbacks will have to be primitive Dart values, serializable over SendPorts.

Use this as a mechanism to query the state of the master isolate. You might find that you won't need it, though.

main() async {
  var nInstances = Platform.numberOfProcessors - 1;
  
  // This instance won't actually serve HTTP, we just use it for DI.
  var app = await createServer();
  var scheduler = new AngelTaskScheduler(app);
  
  // Start listening, running tasks, etc.
  await scheduler.start();
  
  for (int i = 0; i < nInstances; i++) {
    // Spawn child nodes now. Make sure to send the scheduler's SendPort.
    Isolate.spawn(isolateMain, [i, scheduler.receivePort.sendPort]);
  }
}

/// The code that runs in the child nodes, i.e., the ones running the application.
void isolateMain(List args) {
  int id = args[0];
  SendPort masterPort = args[1];
  var app = new Angel.custom(startShared);
  
  // Hook up a task client, then start the server.
  app.configure(taskClientPlugin(masterPort)).then((_) async {
    var server = await app.startServer(InternetAddress.ANY_IP_V4, 3000);
    print('Instance #$id listening at http://${server.address.address}:${server.port}');
  });
}

/// A simple plug-in that connects a server instance to the master task scheduler.
AngelConfigurer taskClientPlugin(SendPort masterPort) {
  return (Angel app) async {
    var client = new AngelTaskClient(masterPort);
    await master.connect(); // Await a connection...
    await master.connect(timeout: new Duration(seconds: 30)); // Optional timeout.
    
    // If we inject the AngelTaskClient as a singleton, we can access it in routes.
    app.container.singleton(client);
    
    // We can dispatch tasks, without waiting for the result.
    app.get('/dispatch', (AngelTaskClient client) {
      client.run('foo', args: ['bar']);
    });
    
    // We can also await the results of tasks.
    app.get('/fibonacci/:number([0-9]+)', (AngelTaskClient client, String number) async {
      var n = int.parse(number);
      var taskResult = await client.run('fibonacci', args: [n]);
      
      if (!taskResult.successful) {
        // Access error and stack trace on failure
        print(taskResult.error);
        print(taskResult.stack);
        throw new AngelHttpException.notProcessable();
      }
      
      // If `sendReturnValues` is `true` in our `AngelTaskScheduler`, then we can
      // also access the value returned from the task function.
      var computation = task.value;
      return {'value': computation};
    });
  };
}

Sockets

Applications of very large scale will likely run on multiple machines; in such a case, mere multi-threading will not do the job.

Luckily, package:angel_task also supports communication over sockets. This can occur at the same time as communication over isolates, so you can use both together in your application.

To bind the scheduler to a socket, you must pass it to the constructor.

main() async {
  var socket = await ServerSocket.bind(InternetAddress.ANY_IP_V4, 5671);
  var app = await createServer();
  var scheduler = new AngelTaskScheduler(app, socket: socket);
  
  // Task configuration...
  
  // Calling `start` will also listen on the socket, if any.
  await scheduler.start(); 
}

And then, to connect a client, it's almost the same process:

main() async {
  var app = await createServer();
  var client = await AngelTaskClient.connectSocket(InternetAddress.LOOPBACK_IP_V4, 5671);
 
  // You can still inject for DI.
  //
  // Just make sure to explicitly pass the right type.
  app.container.singleton(client, as: AngelTaskClient);
  
}

Broadcasting

If you are multi-threading, there may be times when you want to send an impromptu message to all child nodes. Use AngelTaskScheduler.broadcast to achieve this.

Whatever you broadcast should be a primitive Dart value; otherwise, it will not be serializable over a SendPort or Socket.

main() {
  // ...
  scheduler.broadcast({
    'michael': 'jackson'
  });
}

1. Depend on it

Add this to your package's pubspec.yaml file:


dependencies:
  angel_task: "^1.1.1"

2. Install it

You can install packages from the command line:

with pub:


$ pub get

Alternatively, your editor might support pub get. Check the docs for your editor to learn more.

3. Import it

Now in your Dart code, you can use:


import 'package:angel_task/angel_task.dart';
        
Version Uploaded Documentation Archive
1.1.1 Nov 18, 2017 Go to the documentation of angel_task 1.1.1 Download angel_task 1.1.1 archive
1.1.0+1 Jul 9, 2017 Go to the documentation of angel_task 1.1.0+1 Download angel_task 1.1.0+1 archive
1.1.0 Jul 9, 2017 Go to the documentation of angel_task 1.1.0 Download angel_task 1.1.0 archive
1.0.0 Mar 1, 2017 Go to the documentation of angel_task 1.0.0 Download angel_task 1.0.0 archive

Analysis

This feature is new.
We welcome feedback.
More details: scoring.

We analyzed this package, and provided a score, details, and suggestions below.

  • completed on Feb 7, 2018
  • Dart: 2.0.0-dev.20.0
  • pana: 0.10.1

Scores

Popularity:
Describes how popular the package is relative to other packages. [more]
0 / 100
Health:
Code health derived from static analysis. [more]
97 / 100
Maintenance:
Reflects how tidy and up-to-date the package is. [more]
80 / 100
Overall score:
Weighted score of the above. [more]
45

Platforms

Detected platforms: other

Primary library: package:angel_task/angel_task.dart with components: io, mirrors.

Suggestions

  • Maintain CHANGELOG.md.

    Changelog entries help clients to follow the progress in your code.

  • Maintain an example.

    None of the files in your example/ directory matches a known example patterns. Common file name patterns include: main.dart, example.dart or you could also use angel_task.dart.

Dependencies

Package Constraint Resolved Available
Direct dependencies
Dart SDK >=1.19.0
angel_framework ^1.0.0 1.1.1
uuid ^0.5.3 0.5.3
Transitive dependencies
angel_http_exception 1.0.0
angel_model 1.0.0
angel_route 2.0.5
async 2.0.3
body_parser 1.0.3
charcode 1.1.1
collection 1.14.5
combinator 1.0.0-beta+7
container 0.1.2
convert 2.0.1
crypto 2.0.2+1
http_server 0.9.7
json_god 2.0.0-beta+1
logging 0.11.3+1
matcher 0.12.1+4
merge_map 1.0.0
meta 1.1.2
mime 0.9.6
path 1.5.1
pool 1.3.4
quiver_hashcode 1.0.0
random_string 0.0.1
source_span 1.4.0
stack_trace 1.9.1
string_scanner 1.0.2
tuple 1.0.1
typed_data 1.1.5
Dev dependencies
mock_request ^1.0.0
test ^0.12.0