-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Expand file tree
/
Copy pathmachine_workflow.ts
More file actions
148 lines (134 loc) · 4.86 KB
/
machine_workflow.ts
File metadata and controls
148 lines (134 loc) · 4.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import { setTimeout } from 'timers/promises';
import { type Document } from '../../../bson';
import { type IO } from '../../../mongo_client';
import { ns } from '../../../utils';
import type { Connection } from '../../connection';
import type { MongoCredentials } from '../mongo_credentials';
import type { Workflow } from '../mongodb_oidc';
import { finishCommandDocument } from './command_builders';
import { type TokenCache } from './token_cache';
/** The time to throttle callback calls. */
const THROTTLE_MS = 100;
/**
* The access token format.
* @internal
*/
export interface AccessToken {
access_token: string;
expires_in?: number;
}
/** @internal */
export type OIDCTokenFunction = (
credentials: MongoCredentials,
client: { io: IO }
) => Promise<AccessToken>;
/**
* Common behaviour for OIDC machine workflows.
* @internal
*/
export abstract class MachineWorkflow implements Workflow {
cache: TokenCache;
callback: OIDCTokenFunction;
lastExecutionTime: number;
client: { io: IO };
/**
* Instantiate the machine workflow.
*/
constructor(client: { io: IO }, cache: TokenCache) {
this.client = client;
this.cache = cache;
this.callback = this.withLock(this.getToken.bind(this));
this.lastExecutionTime = Date.now() - THROTTLE_MS;
}
/**
* Execute the workflow. Gets the token from the subclass implementation.
*/
async execute(connection: Connection, credentials: MongoCredentials): Promise<void> {
const token = await this.getTokenFromCacheOrEnv(connection, credentials);
const command = finishCommandDocument(token);
await connection.command(ns(credentials.source), command, undefined);
}
/**
* Reauthenticate on a machine workflow just grabs the token again since the server
* has said the current access token is invalid or expired.
*/
async reauthenticate(connection: Connection, credentials: MongoCredentials): Promise<void> {
if (this.cache.hasAccessToken) {
// Reauthentication implies the token has expired.
if (connection.accessToken === this.cache.getAccessToken()) {
// If connection's access token is the same as the cache's, remove
// the token from the cache and connection.
this.cache.removeAccessToken();
delete connection.accessToken;
} else {
// If the connection's access token is different from the cache's, set
// the cache's token on the connection and do not remove from the
// cache.
connection.accessToken = this.cache.getAccessToken();
}
}
await this.execute(connection, credentials);
}
/**
* Get the document to add for speculative authentication.
*/
async speculativeAuth(connection: Connection, credentials: MongoCredentials): Promise<Document> {
// The spec states only cached access tokens can use speculative auth.
if (!this.cache.hasAccessToken) {
return {};
}
const token = await this.getTokenFromCacheOrEnv(connection, credentials);
const document = finishCommandDocument(token);
document.db = credentials.source;
return { speculativeAuthenticate: document };
}
/**
* Get the token from the cache or environment.
*/
private async getTokenFromCacheOrEnv(
connection: Connection,
credentials: MongoCredentials
): Promise<string> {
if (this.cache.hasAccessToken) {
const token = this.cache.getAccessToken();
// New connections won't have an access token so ensure we set here.
if (!connection.accessToken) {
connection.accessToken = token;
}
return token;
} else {
const token = await this.callback(credentials, connection.client);
this.cache.put({ accessToken: token.access_token, expiresInSeconds: token.expires_in });
// Put the access token on the connection as well.
connection.accessToken = token.access_token;
return token.access_token;
}
}
/**
* Ensure the callback is only executed one at a time, and throttled to
* only once per 100ms.
*/
private withLock(callback: OIDCTokenFunction): OIDCTokenFunction {
let lock: Promise<any> = Promise.resolve();
return async (credentials: MongoCredentials): Promise<AccessToken> => {
// We do this to ensure that we would never return the result of the
// previous lock, only the current callback's value would get returned.
await lock;
lock = lock
.catch(() => null)
.then(async () => {
const difference = Date.now() - this.lastExecutionTime;
if (difference <= THROTTLE_MS) {
await setTimeout(THROTTLE_MS - difference);
}
this.lastExecutionTime = Date.now();
return await callback(credentials, this.client);
});
return await lock;
};
}
/**
* Get the token from the environment or endpoint.
*/
abstract getToken(credentials: MongoCredentials, client: { io: IO }): Promise<AccessToken>;
}