Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
2b5c6f5
Rewrite using amqplib
nathan-muir Nov 8, 2018
29486f6
wip
nathan-muir Jan 8, 2020
e574d8f
more wip
nathan-muir Jan 27, 2020
648737b
Prevent error message on multiple calls to `close()`
nathan-muir Jan 28, 2020
a36c1a2
protocol: include task type in event message
nathan-muir Jan 28, 2020
be8e095
bump to 5.0.0-alpha.5
nathan-muir Jan 28, 2020
f10ef27
remove requeue when nack'ing
nathan-muir Sep 23, 2020
2d3ccd3
bump to 5.0.0-alpha.6
nathan-muir Sep 23, 2020
0da2c3d
improve nack error message
nathan-muir Sep 23, 2020
f9d7d2b
bump to 5.0.0-alpha.7
nathan-muir Sep 23, 2020
4040aa7
chore: update dependencies
nathan-muir Feb 8, 2022
4176bdd
bump to 5.0.0-alpha.8
nathan-muir Feb 8, 2022
e925108
update celery test
nathan-muir Feb 10, 2022
46289ce
update eslint, prettier & rollup config
nathan-muir Aug 18, 2023
1fc9424
update amqplib & rxjs, remove bluebird
nathan-muir Aug 18, 2023
e3c6dff
bump to 5.0.0-alpha.9
nathan-muir Aug 18, 2023
5cf7ff0
update .npmignore
nathan-muir Aug 18, 2023
db36f42
update uuid
nathan-muir Aug 18, 2023
086c9e4
bump to 5.0.0-alpha.10
nathan-muir Aug 18, 2023
dd80244
Update package.json
nathan-muir Aug 23, 2023
2ecdc2e
bump to 5.0.0-alpha.13
nathan-muir Aug 23, 2023
c3d8ba8
fix: skip destroying tasks on closed backend
nathan-muir Feb 18, 2025
f207a74
bump to 5.0.0-alpha.14
nathan-muir Feb 18, 2025
eb5a552
chore: npm install
nathan-muir Apr 14, 2025
837856f
feat: update celery & python to latest
nathan-muir Apr 14, 2025
ec24c6b
feat: support priority queues
nathan-muir Apr 14, 2025
4c54c71
bump to 5.0.0-alpha.15
nathan-muir Apr 14, 2025
0d2d587
add loose types
nathan-muir Mar 8, 2026
018cc9c
bump to 5.0.0
nathan-muir Mar 8, 2026
0abeb12
update type for `.call` to handle `ignoreResult: true`
nathan-muir Mar 8, 2026
44d2a2a
bump to 5.0.1
nathan-muir Mar 8, 2026
7876da7
chore `npm install`
nathan-muir Apr 13, 2026
65daa02
install amqplib@1
nathan-muir Apr 13, 2026
5c2f2b9
bump to 5.1.0-alpha.1
nathan-muir Apr 13, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .npmignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
.idea/
benchmarks/
env/
examples/
meteor/
rollup.config.js
src/
test/
README.md
124 changes: 65 additions & 59 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ If you are new to Celery check out http://celeryproject.org/

## Differences with `node-celery`

1. This library is now based on [amqp-coffee](https://github.com/dropbox/amqp-coffee),
1. This library is now based on [amqplib](https://github.com/squaremo/amqp.node),
instead of [node-amqp](https://github.com/postwait/node-amqp).

2. `EventEmitter` based code has been removed; only pure callbacks are available.
2. `EventEmitter` based code has been removed; only promises are available.

3. Support for the Redis Backend has been removed.

Expand All @@ -30,15 +30,15 @@ If you are new to Celery check out http://celeryproject.org/

Simple example, included as [examples/hello-world.js](https://github.com/3stack-software/celery-shoot/blob/master/examples/hello-world.js):

```javascript
var celery = require('celery-shoot'),
client = celery.connectWithUri('amqp://guest:guest@localhost:5672//', function(err){
assert(err == null);

var task = client.createTask('tasks.echo');
task.invoke(["Hello Wolrd"], function(err, result){
console.log(err, result);
})
```js
import { withClient } from 'celery-shoot';

withClient('amqp://guest:guest@localhost:5672//', {}, async client => {
const result = await client.call({
name: 'tasks.error',
args: ['Hello World'],
});
console.log('tasks.echo response:', result);
});
```

Expand All @@ -47,20 +47,20 @@ client = celery.connectWithUri('amqp://guest:guest@localhost:5672//', function(e

The ETA (estimated time of arrival) lets you set a specific date and time that is the earliest time at which your task will be executed:

```javascript
var celery = require('celery-shoot'),
client = celery.connectWithUri('amqp://guest:guest@localhost:5672//', function(err){
assert(err == null);

var task = client.createTask('tasks.send_email', {
eta: 60 * 60 * 1000 // execute in an hour from invocation
}, {
ignoreResult: true // ignore results
```js
import { withClient } from 'celery-shoot';

withClient('amqp://guest:guest@localhost:5672//', {}, async client => {
await client.call({
name: 'tasks.send_email',
kwargs: {
to: 'to@example.com',
title: 'sample email',
},
eta: 30 * 1000,
ignoreResult: true,
});
task.invoke([], {
to: 'to@example.com',
title: 'sample email'
})
console.log('tasks.send_email sent');
});
```

Expand All @@ -69,59 +69,65 @@ client = celery.connectWithUri('amqp://guest:guest@localhost:5672//', function(e
The expires argument defines an optional expiry time, a specific date and time using Date:

```javascript
var celery = require('celery-shoot'),
client = celery.connectWithUri('amqp://guest:guest@localhost:5672//', function(err){
assert(err == null);
import { withClient } from 'celery-shoot';

var task = client.createTask('tasks.sleep', {
eta: 60 * 60 * 1000 // expire in an hour
withClient('amqp://guest:guest@localhost:5672//', {}, async client => {
await client.call({
name: 'tasks.sleep',
args: [2 * 60 * 60],
expires: 1000, // in 1s
});
task.invoke([2 * 60 * 60], function(err, res){
console.log(err, res);
})
});
```

### Routing

The simplest way to route tasks to different queues is using `options.routes`:

```javascript
var celery = require('celery-shoot'),
client = celery.connectWithUri('amqp://guest:guest@localhost:5672//', {
routes: {
'tasks.send_mail': {
'queue': 'mail'
You can also configure custom routers, similar to http://celery.readthedocs.org/en/latest/userguide/routing.html#routers

```js
import { withClient } from 'celery-shoot';


const routes = [
{
'tasks.send_mail': {
queue: 'mail',
},
},
(task, args, kwargs) => {
if(task === 'myapp.tasks.compress_video'){
return {
'exchange': 'video',
'routingKey': 'video.compress'
}
}
return null;
}
}, function(err){
assert(err == null);

var task = client.createTask('tasks.send_email');
task.invoke([], {
to: 'to@example.com',
title: 'sample email'
];

withClient('amqp://guest:guest@localhost:5672//', { routes }, async client => {
await client.call({
name: 'tasks.send_email',
kwargs: {
to: 'to@example.com',
title: 'sample email',
},
ignoreResult: true,
});
var task2 = client.createTask('tasks.calculate_rating');
task2.invoke([], {
item: 1345
await client.call({
name: 'tasks.calculate_rating',
kwargs: {
item: 1345,
},
});
});
```

You can also configure custom routers, similar to http://celery.readthedocs.org/en/latest/userguide/routing.html#routers


```js
var myRouter = function(task, args, kwargs){
if(task === 'myapp.tasks.compress_video'){
return {
'exchange': 'video',
'routingKey': 'video.compress'
}
}
return null;
}
var myRouter =
Client({
routes: [myRouter]
});
Expand Down
51 changes: 27 additions & 24 deletions benchmarks/pub.js
Original file line number Diff line number Diff line change
@@ -1,31 +1,34 @@
const assert = require('assert');
const each = require('async/each');
const celery = require('../src/celery');
import { withClient } from '../dist/celery-shoot.esm.js';

const AMQP_HOST = process.env.AMQP_HOST || 'amqp://guest:guest@localhost//';

const n = parseInt(process.argv.length > 2 ? process.argv[2] : 100, 10);
const L = `tasksSent (n=${n})`;

function* series(a) {
for(let i = 0; i < a; i += 1) {
yield i;
}
}

var client = celery.connectWithUri(AMQP_HOST, function (err) {
assert(err == null);
const task = client.createTask('tasks.add', {}, { ignoreResult: true });
console.time(L);
each(series(n), function (i, cb) {
task.invoke([i, i], cb);
}, function (err) {
console.timeEnd(L);
if (err) {
console.error(err)
withClient(
AMQP_HOST,
{
sendTaskSentEvent: false,
},
async (client) => {
console.time(L);
for (let i = 0; i < n; i++) {
try {
const { writeResult } = client.call({
name: 'tasks.add',
args: [i, i],
ignoreResult: true, // specifically ignore results for speed test
});
if (!writeResult) {
console.log('waiting', i);
await client.waitForDrain();
}
} catch (err) {
console.log('error when i=', i, err);
throw err;
}
}
client.close(function() {
console.log('done')
});
});
});
await client.waitForDrain();
console.timeEnd(L);
},
);
61 changes: 37 additions & 24 deletions benchmarks/pubsub.js
Original file line number Diff line number Diff line change
@@ -1,31 +1,44 @@
const assert = require('assert');
const each = require('async/each');
const celery = require('../src/celery');
import { withClient } from '../dist/celery-shoot.esm.js';

const AMQP_HOST = process.env.AMQP_HOST || 'amqp://guest:guest@localhost//';

const n = parseInt(process.argv.length > 2 ? process.argv[2] : 100, 10);
const L = `tasksSent (n=${n})`;
const L = `tasksCompleted (n=${n})`;

function* series(a) {
for(let i = 0; i < a; i += 1) {
yield i;
}
}
withClient(
AMQP_HOST,
{
sendTaskSentEvent: false,
},
async (client) => {
console.time(L);
const promises = [];
for (let i = 0; i < n; i++) {
const { writeResult, result } = client.call({
name: 'tasks.add',
args: [i, i],
});
if (!writeResult) {
console.log('waiting', i);
await client.waitForDrain();
}
promises.push(result.get());

var client = celery.connectWithUri(AMQP_HOST, function (err) {
assert(err == null);
const task = client.createTask('tasks.add', {}, { ignoreResult: false });
console.time(L);
each(series(n), function (i, cb) {
task.invoke([i, i], cb);
}, function (err) {
console.timeEnd(L);
if (err) {
console.error(err)
if (promises.length >= 1000) {
console.log('getting results', i, promises.length);
await Promise.all(promises);
promises.length = 0;
}
}
client.close(function() {
console.log('done')
});
});
});
console.log('all tasks sent');
return Promise.all(promises).then(
() => {
console.timeEnd(L);
},
(err) => {
console.timeEnd(L);
console.error(err);
},
);
},
);
Loading
Loading