Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "hawk.api",
"version": "1.5.1",
"version": "1.5.2",
"main": "index.ts",
"license": "BUSL-1.1",
"scripts": {
Expand Down
27 changes: 27 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,8 @@ class HawkAPI {

await redis.initialize();

this.registerShutdownHandlers(redis);

/**
* Setup shared factories for SSO and GitHub integration routes
* These endpoints don't require per-request DataLoaders isolation,
Expand Down Expand Up @@ -293,6 +295,31 @@ class HawkAPI {
});
});
}

/**
* Closes HTTP, Mongo and Redis connections on SIGINT/SIGTERM.
*
* @param redis - Redis helper to close on shutdown
*/
private registerShutdownHandlers(redis: RedisHelper): void {
let shuttingDown = false;

const shutdown = async (signal: NodeJS.Signals): Promise<void> => {
if (shuttingDown) {
return;
}
shuttingDown = true;
console.log(`[Shutdown] ${signal} received, closing connections`);

await new Promise<void>((resolve) => this.httpServer.close(() => resolve()));
await Promise.allSettled([mongo.closeConnections(), redis.close()]);

process.exit(0);
};

process.once('SIGINT', shutdown);
process.once('SIGTERM', shutdown);
Comment thread
Kuchizu marked this conversation as resolved.
}
}

export default HawkAPI;
150 changes: 124 additions & 26 deletions src/mongo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,20 @@ import { setupMongoMetrics, withMongoMetrics } from './metrics';
const hawkDBUrl = process.env.MONGO_HAWK_DB_URL || 'mongodb://localhost:27017/hawk';
const eventsDBUrl = process.env.MONGO_EVENTS_DB_URL || 'mongodb://localhost:27017/events';

const reconnectTries = Number(process.env.MONGO_RECONNECT_TRIES) || 60;
const reconnectInterval = Number(process.env.MONGO_RECONNECT_INTERVAL) || 1000;

Comment thread
Kuchizu marked this conversation as resolved.
/**
* serverSelectionTimeoutMS bounds how long an op waits for an available
* server — without it queries hang forever during an outage.
*/
const connectionConfig: MongoClientOptions = withMongoMetrics({
serverSelectionTimeoutMS: 10000,
socketTimeoutMS: 45000,
retryWrites: true,
retryReads: true,
});

/**
* Connections to Hawk databases
*/
Expand Down Expand Up @@ -52,40 +66,124 @@ export const mongoClients: MongoClients = {
};

/**
* Common params for all connections
* Connects to the given URL, retrying with a fixed interval up to
* MONGO_RECONNECT_TRIES times before giving up.
*
* @param name - logical name for logging
* @param url - MongoDB connection string
* @returns connected client
*/
async function connectWithRetry(name: string, url: string): Promise<MongoClient> {
let lastError = 'unknown error';

for (let attempt = 1; attempt <= reconnectTries; attempt++) {
const client = new MongoClient(url, connectionConfig);

try {
await client.connect();
console.log(`[Mongo:${name}] connected`);

return client;
} catch (err) {
await client.close().catch(() => undefined);

lastError = (err as Error)?.message ?? String(err);
console.warn(`[Mongo:${name}] attempt ${attempt}/${reconnectTries} failed: ${lastError}`);

if (attempt < reconnectTries) {
await new Promise((resolve) => setTimeout(resolve, reconnectInterval));
}
}
Comment thread
Kuchizu marked this conversation as resolved.
}

throw new Error(`[Mongo:${name}] failed after ${reconnectTries} attempts: ${lastError}`);
}

/**
* Common params for all connections
* Note: useNewUrlParser and useUnifiedTopology are deprecated in mongodb 6.x and removed
* Logs and reports heartbeat failures / recoveries once per transition.
*
* @param name - logical name for logging
* @param client - connected client to observe
*/
const connectionConfig: MongoClientOptions = withMongoMetrics({});
function watchConnection(name: string, client: MongoClient): void {
let healthy = true;

client.on('serverHeartbeatFailed', (event) => {
if (!healthy) {
return;
}
healthy = false;
const message = (event.failure as Error)?.message ?? 'heartbeat failed';

console.error(`[Mongo:${name}] connection lost: ${message}`);
HawkCatcher.send(new Error(`MongoDB ${name} connection lost: ${message}`));
});

client.on('serverHeartbeatSucceeded', () => {
if (healthy) {
return;
}
healthy = true;
console.log(`[Mongo:${name}] connection recovered`);
});
}

/**
* Setups connections to the databases (hawk api and events databases)
* Connects to both databases with bounded retry. The driver auto-recovers
* from transient failures on already-open clients, so retries here cover
* the initial handshake only.
*
* @returns promise resolved when both clients are connected
*/
export async function setupConnections(): Promise<void> {
try {
const [hawkMongoClient, eventsMongoClient] = await Promise.all([
MongoClient.connect(hawkDBUrl, connectionConfig),
MongoClient.connect(eventsDBUrl, connectionConfig),
]);

mongoClients.hawk = hawkMongoClient;
mongoClients.events = eventsMongoClient;

databases.hawk = hawkMongoClient.db();
databases.events = eventsMongoClient.db();

/**
* Log and and measure MongoDB metrics
*/
setupMongoMetrics(hawkMongoClient);
setupMongoMetrics(eventsMongoClient);
} catch (e) {
/** Catch start Mongo errors */
HawkCatcher.send(e as Error);
throw e;
const results = await Promise.allSettled([
connectWithRetry('hawk', hawkDBUrl),
connectWithRetry('events', eventsDBUrl),
]);

const failure = results.find((r): r is PromiseRejectedResult => r.status === 'rejected');

if (failure) {
/** Close any clients that did connect so we don't leak sockets */
await Promise.allSettled(
results.map((r) => (r.status === 'fulfilled' ? r.value.close() : Promise.resolve()))
);
HawkCatcher.send(failure.reason as Error);
throw failure.reason;
}

const hawkClient = (results[0] as PromiseFulfilledResult<MongoClient>).value;
const eventsClient = (results[1] as PromiseFulfilledResult<MongoClient>).value;

mongoClients.hawk = hawkClient;
mongoClients.events = eventsClient;
databases.hawk = hawkClient.db();
databases.events = eventsClient.db();

/**
* Log and measure MongoDB metrics, then observe heartbeats for outage logs
*/
setupMongoMetrics(hawkClient);
setupMongoMetrics(eventsClient);
watchConnection('hawk', hawkClient);
watchConnection('events', eventsClient);
}

/**
* Closes both clients. Call from SIGTERM/SIGINT for graceful shutdown.
*
* @returns promise resolved once both clients are closed
*/
export async function closeConnections(): Promise<void> {
await Promise.allSettled([
mongoClients.hawk?.close(),
mongoClients.events?.close(),
]);

mongoClients.hawk = null;
mongoClients.events = null;
databases.hawk = null;
databases.events = null;
}

/**
Expand Down
86 changes: 86 additions & 0 deletions test/mongo.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
const connectMock = jest.fn();
const closeMock = jest.fn().mockResolvedValue(undefined);

jest.mock('mongodb', () => ({
MongoClient: jest.fn().mockImplementation(() => ({
connect: connectMock,
close: closeMock,
db: jest.fn().mockReturnValue({ databaseName: 'test' }),
on: jest.fn(),
})),
}));

jest.mock('@hawk.so/nodejs', () => ({
__esModule: true,
default: { send: jest.fn() },
}));

jest.mock('../src/metrics', () => ({
setupMongoMetrics: jest.fn(),
withMongoMetrics: (options: Record<string, unknown>): Record<string, unknown> => options,
}));

/**
* Loads a fresh copy of src/mongo with the given retry env vars applied.
*
* @param tries - value for MONGO_RECONNECT_TRIES
* @returns the freshly required mongo module
*/
function loadMongo(tries: number): typeof import('../src/mongo') {
jest.resetModules();
process.env.MONGO_RECONNECT_TRIES = String(tries);
process.env.MONGO_RECONNECT_INTERVAL = '1';

return require('../src/mongo');
}

describe('mongo connection', () => {
beforeEach(() => {
jest.clearAllMocks();
jest.spyOn(console, 'log').mockImplementation(() => undefined);
jest.spyOn(console, 'warn').mockImplementation(() => undefined);
jest.spyOn(console, 'error').mockImplementation(() => undefined);
});

afterEach(() => {
jest.restoreAllMocks();
});

test('retries on failure and connects when a later attempt succeeds', async () => {
connectMock
.mockRejectedValueOnce(new Error('down'))
.mockRejectedValueOnce(new Error('down'))
.mockResolvedValue(undefined);

const mongo = loadMongo(3);

await expect(mongo.setupConnections()).resolves.toBeUndefined();
expect(mongo.databases.hawk).not.toBeNull();
expect(mongo.databases.events).not.toBeNull();
});

test('rejects after exhausting MONGO_RECONNECT_TRIES attempts', async () => {
connectMock.mockRejectedValue(new Error('down'));

const mongo = loadMongo(3);

await expect(mongo.setupConnections()).rejects.toThrow(/failed after 3 attempts/);
/** 3 tries per client, hawk + events run in parallel */
expect(connectMock).toHaveBeenCalledTimes(6);
});

test('closeConnections closes clients and nulls exported handles', async () => {
connectMock.mockResolvedValue(undefined);

const mongo = loadMongo(3);

await mongo.setupConnections();
await mongo.closeConnections();

expect(closeMock).toHaveBeenCalledTimes(2);
expect(mongo.databases.hawk).toBeNull();
expect(mongo.databases.events).toBeNull();
expect(mongo.mongoClients.hawk).toBeNull();
expect(mongo.mongoClients.events).toBeNull();
});
});
Loading