Skip to content

Commit

Permalink
fix issues with task queue
Browse files Browse the repository at this point in the history
  • Loading branch information
lalalune committed Mar 4, 2025
1 parent 6a2355d commit dff1d5c
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 21 deletions.
4 changes: 2 additions & 2 deletions packages/agent/src/swarm/investmentManager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,9 @@ const config: OnboardingConfig = {

export default {
plugins: [
degenIntelPlugin,
// degenIntelPlugin,
degenTraderPlugin,
communityTraderPlugin,
// communityTraderPlugin,
],
character,
init: (runtime: IAgentRuntime) => initCharacter({ runtime, config }),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export const registerTasks = async (runtime: IAgentRuntime, worldId?: UUID) => {
updatedAt: Date.now(),
updateInterval: 1000 * 60 * 60, // 1 hour
},
tags: ["queue", "repeat"],
tags: ["queue", "schedule", "degen_intel"],
});

runtime.registerTaskWorker({
Expand All @@ -50,7 +50,7 @@ export const registerTasks = async (runtime: IAgentRuntime, worldId?: UUID) => {
updatedAt: Date.now(),
updateInterval: 1000 * 60 * 5, // 5 minutes
},
tags: ["queue", "repeat"],
tags: ["queue", "schedule", "degen_intel"],
});

runtime.registerTaskWorker({
Expand All @@ -72,7 +72,7 @@ export const registerTasks = async (runtime: IAgentRuntime, worldId?: UUID) => {
updatedAt: Date.now(),
updateInterval: 1000 * 60 * 15, // 15 minutes
},
tags: ["queue", "repeat"],
tags: ["queue", "schedule", "degen_intel"],
});

runtime.registerTaskWorker({
Expand All @@ -94,7 +94,7 @@ export const registerTasks = async (runtime: IAgentRuntime, worldId?: UUID) => {
updatedAt: Date.now(),
updateInterval: 1000 * 60 * 5, // 5 minutes
},
tags: ["queue", "repeat"],
tags: ["queue", "schedule", "degen_intel"],
});

runtime.registerTaskWorker({
Expand All @@ -116,7 +116,7 @@ export const registerTasks = async (runtime: IAgentRuntime, worldId?: UUID) => {
updatedAt: Date.now(),
updateInterval: 1000 * 60 * 5, // 5 minutes
},
tags: ["queue"],
tags: ["queue", "schedule", "degen_intel"],
});

runtime.registerTaskWorker({
Expand All @@ -138,6 +138,6 @@ export const registerTasks = async (runtime: IAgentRuntime, worldId?: UUID) => {
updatedAt: Date.now(),
updateInterval: 1000 * 60 * 60 * 24, // 24 hours
},
tags: ["queue"],
tags: ["queue", "schedule", "degen_intel"],
});
};
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export class DegenTradingService extends Service {
// For tracking pending sells
private pendingSells: { [tokenAddress: string]: bigint } = {};

static serviceType = "degen_trading";
static serviceType = "degen_trader";

constructor(protected runtime: IAgentRuntime) {
super(runtime);
Expand Down Expand Up @@ -657,7 +657,7 @@ export class DegenTradingService extends Service {
roomId: `trade-0000-0000-0000-${Date.now().toString(16)}`, // Generate a unique room ID
name: "EXECUTE_BUY",
description: `Execute buy for ${signal.tokenAddress}`,
tags: ["queue", "trade", "buy"],
tags: ["queue", "schedule", "degen_trader"],
metadata: {
signal,
updatedAt: Date.now(),
Expand All @@ -678,7 +678,7 @@ export class DegenTradingService extends Service {
roomId: `trade-0000-0000-0000-${Date.now().toString(16)}`, // Generate a unique room ID
name: "EXECUTE_SELL",
description: `Execute sell for ${signal.tokenAddress}`,
tags: ["queue", "trade", "sell"],
tags: ["queue", "schedule", "degen_trader"],
metadata: {
signal,
updatedAt: Date.now(),
Expand Down Expand Up @@ -803,6 +803,7 @@ export class DegenTradingService extends Service {
this.runtime.registerTaskWorker({
name: "EXECUTE_BUY",
execute: async (_runtime: IAgentRuntime, options: any) => {
logger.info("*** EXECUTE_BUY ***");
await this.executeBuyTask(options);
},
validate: async () => true
Expand All @@ -812,6 +813,7 @@ export class DegenTradingService extends Service {
this.runtime.registerTaskWorker({
name: "EXECUTE_SELL",
execute: async (_runtime: IAgentRuntime, options: any) => {
logger.info("*** EXECUTE_SELL ***");
await this.executeSellTask(options);
},
validate: async () => true
Expand All @@ -821,6 +823,7 @@ export class DegenTradingService extends Service {
this.runtime.registerTaskWorker({
name: "GENERATE_BUY_SIGNAL",
execute: async () => {
logger.info("*** GENERATE_BUY_SIGNAL ***");
await this.generateBuySignal();
},
validate: async () => true
Expand All @@ -830,6 +833,7 @@ export class DegenTradingService extends Service {
this.runtime.registerTaskWorker({
name: "SYNC_WALLET",
execute: async () => {
logger.info("*** SYNC_WALLET ***");
await this.syncWallet();
},
validate: async () => true
Expand All @@ -839,6 +843,7 @@ export class DegenTradingService extends Service {
this.runtime.registerTaskWorker({
name: "MONITOR_TOKEN",
execute: async (_runtime: IAgentRuntime, options: any) => {
logger.info("*** MONITOR_TOKEN ***");
await this.monitorToken(options);
},
validate: async () => true
Expand All @@ -854,9 +859,10 @@ export class DegenTradingService extends Service {
* Creates scheduled tasks
*/
private async createScheduledTasks() {
console.log("*** Creating scheduled tasks ***");
// Clear existing schedules for this agent
const existingTasks = await this.runtime.databaseAdapter.getTasks({
tags: ["queue", "schedule", "trade"]
tags: ["queue", "schedule", "degen_trader"],
});

for (const task of existingTasks) {
Expand All @@ -869,7 +875,7 @@ export class DegenTradingService extends Service {
roomId: this.runtime.agentId,
name: "GENERATE_BUY_SIGNAL",
description: "Generate buy signal every 10 minutes",
tags: ["queue", "schedule", "trade"],
tags: ["queue", "schedule", "degen_trader"],
metadata: {
updatedAt: Date.now(),
updateInterval: 600000, // 10 minutes
Expand All @@ -883,7 +889,7 @@ export class DegenTradingService extends Service {
roomId: this.runtime.agentId,
name: "SYNC_WALLET",
description: "Sync wallet information every 10 minutes",
tags: ["queue", "schedule", "trade"],
tags: ["queue", "schedule", "degen_trader"],
metadata: {
updatedAt: Date.now(),
updateInterval: 600000, // 10 minutes
Expand Down Expand Up @@ -1001,7 +1007,7 @@ export class DegenTradingService extends Service {

// Cancel all scheduled tasks
const existingTasks = await this.runtime.databaseAdapter.getTasks({
tags: ["queue", "schedule", "trade"]
tags: ["queue", "schedule", "degen_trader"],
});

for (const task of existingTasks) {
Expand Down Expand Up @@ -1104,7 +1110,7 @@ export class DegenTradingService extends Service {
roomId: this.runtime.agentId,
name: "MONITOR_TOKEN",
description: `Monitor token ${data.tokenAddress}`,
tags: ["queue", "monitor", "trade"],
tags: ["queue", "schedule", "degen_trader"],
metadata: {
tokenAddress: data.tokenAddress,
initialPrice: data.initialPrice,
Expand All @@ -1129,7 +1135,7 @@ export class DegenTradingService extends Service {
try {
// Find monitoring tasks for this process
const tasks = await this.runtime.databaseAdapter.getTasks({
tags: ["queue", "monitor", "trade"]
tags: ["queue", "schedule", "degen_trader"],
});

// Delete all related monitoring tasks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,5 +159,5 @@ export interface StartDegenProcessParams extends StartProcessParams {
}

export const ServiceTypes = {
DEGEN_TRADING: "degen_trading",
DEGEN_TRADING: "degen_trader",
} as const;
4 changes: 1 addition & 3 deletions packages/plugin-sql/src/base.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import {
type Agent,
type Character,
type Component,
DatabaseAdapter,
type Entity,
Expand All @@ -11,7 +10,6 @@ import {
type Participant,
type Relationship,
type RoomData,
stringToUuid,
type Task,
type UUID,
type WorldData
Expand Down Expand Up @@ -1761,7 +1759,7 @@ export abstract class BaseDrizzleAdapter<TDatabase extends DrizzleOperations>
if (params.tags && params.tags.length > 0) {
// Filter by tags - tasks that have all of the specified tags
query = query.where(
sql`${taskTable.tags} @> ARRAY[${sql.join(params.tags, ', ')}]::text[]`
sql`${taskTable.tags} && array[${params.tags.map(tag => sql`${tag}`).join(', ')}]::text[]`
);
}

Expand Down

0 comments on commit dff1d5c

Please sign in to comment.