Skip to content

Commit 8fa05c6

Browse files
clean up server selection logic
1 parent 67081d4 commit 8fa05c6

2 files changed

Lines changed: 101 additions & 74 deletions

File tree

src/sdam/server_selection.ts

Lines changed: 100 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { MongoInvalidArgumentError } from '../error';
1+
import { MongoInvalidArgumentError, MongoRuntimeError } from '../error';
22
import { ReadPreference } from '../read_preference';
33
import { ServerType, TopologyType } from './common';
44
import type { ServerDescription, TagSet } from './server_description';
@@ -282,44 +282,21 @@ function isDeprioritizedFactory(
282282
!deprioritized.has(server);
283283
}
284284

285-
/**
286-
* Returns a function which selects servers based on a provided read preference
287-
*
288-
* @param readPreference - The read preference to select with
289-
*/
290-
export function readPreferenceServerSelector(readPreference: ReadPreference): ServerSelector {
291-
if (!readPreference.isValid()) {
292-
throw new MongoInvalidArgumentError('Invalid read preference specified');
293-
}
294-
295-
return function readPreferenceServers(
296-
topologyDescription: TopologyDescription,
297-
servers: ServerDescription[],
298-
deprioritized: DeprioritizedServers
299-
): ServerDescription[] {
300-
if (topologyDescription.type === TopologyType.LoadBalanced) {
301-
return servers.filter(loadBalancerFilter);
302-
}
303-
304-
if (topologyDescription.type === TopologyType.Unknown) {
305-
return [];
306-
}
307-
308-
if (topologyDescription.type === TopologyType.Single) {
309-
return latencyWindowReducer(topologyDescription, servers.filter(knownFilter));
310-
}
311-
312-
if (topologyDescription.type === TopologyType.Sharded) {
313-
const selectable = filterDeprioritized(servers, deprioritized);
314-
return latencyWindowReducer(topologyDescription, selectable.filter(knownFilter));
315-
}
316-
317-
const mode = readPreference.mode;
318-
if (mode === ReadPreference.PRIMARY) {
319-
return filterDeprioritized(servers.filter(primaryFilter), deprioritized);
320-
}
321-
322-
if (mode === ReadPreference.PRIMARY_PREFERRED) {
285+
function secondarySelector(
286+
readPreference: ReadPreference,
287+
topologyDescription: TopologyDescription,
288+
servers: ServerDescription[],
289+
deprioritized: DeprioritizedServers
290+
) {
291+
const mode = readPreference.mode;
292+
switch (mode) {
293+
case 'primary':
294+
// Note: no need to filter for deprioritized servers. A replica set has only one primary; that means that
295+
// we are in one of two scenarios:
296+
// 1. deprioritized servers is empty - return the primary.
297+
// 2. deprioritized servers contains the primary - return the primary.
298+
return servers.filter(primaryFilter);
299+
case 'primaryPreferred': {
323300
const primary = servers.filter(primaryFilter);
324301

325302
// If there is a primary and it is not deprioritized, use the primary. Otherwise,
@@ -329,60 +306,110 @@ export function readPreferenceServerSelector(readPreference: ReadPreference): Se
329306
return eligiblePrimary;
330307
}
331308

309+
// If we make it here, we either have:
310+
// 1. a deprioritized primary
311+
// 2. no eligible primary
312+
// secondaries take precedence of deprioritized primaries.
332313
const secondaries = tagSetReducer(
333314
readPreference,
334315
maxStalenessReducer(readPreference, topologyDescription, servers.filter(secondaryFilter))
335316
);
336-
const deprioritizedSecondaries = secondaries.filter(isDeprioritizedFactory(deprioritized));
337317

338-
// console.error({ deprioritizedSecondaries, secondaries, deprioritized });
339-
if (deprioritizedSecondaries.length)
340-
return latencyWindowReducer(topologyDescription, deprioritizedSecondaries);
318+
const eligibleSecondaries = secondaries.filter(isDeprioritizedFactory(deprioritized));
319+
if (eligibleSecondaries.length) {
320+
return latencyWindowReducer(topologyDescription, eligibleSecondaries);
321+
}
341322

342323
// if we make it here, we have no primaries or secondaries that not deprioritized.
343324
// prefer the primary (which may not exist, if the topology has no primary).
344325
// otherwise, return the secondaries (which also may not exist, but there is nothing else to check here).
345326
return primary.length ? primary : latencyWindowReducer(topologyDescription, secondaries);
346327
}
347-
348-
// TODO: should we be applying the latency window to nearest servers?
349-
if (mode === 'nearest') {
350-
// if read preference is nearest
351-
return latencyWindowReducer(
352-
topologyDescription,
353-
filterDeprioritized(
354-
tagSetReducer(
355-
readPreference,
356-
maxStalenessReducer(readPreference, topologyDescription, servers.filter(nearestFilter))
357-
),
358-
deprioritized
359-
)
328+
case 'nearest': {
329+
const eligible = filterDeprioritized(
330+
tagSetReducer(
331+
readPreference,
332+
maxStalenessReducer(readPreference, topologyDescription, servers.filter(nearestFilter))
333+
),
334+
deprioritized
360335
);
336+
return latencyWindowReducer(topologyDescription, eligible);
361337
}
338+
case 'secondary':
339+
case 'secondaryPreferred': {
340+
const secondaries = tagSetReducer(
341+
readPreference,
342+
maxStalenessReducer(readPreference, topologyDescription, servers.filter(secondaryFilter))
343+
);
344+
const eligibleSecondaries = secondaries.filter(isDeprioritizedFactory(deprioritized));
362345

363-
const filter = secondaryFilter;
346+
if (eligibleSecondaries.length) {
347+
return latencyWindowReducer(topologyDescription, eligibleSecondaries);
348+
}
364349

365-
const secondaries = tagSetReducer(
366-
readPreference,
367-
maxStalenessReducer(readPreference, topologyDescription, servers.filter(filter))
368-
);
369-
const eligibleSecondaries = secondaries.filter(isDeprioritizedFactory(deprioritized));
350+
// we have no eligible secondaries, try for a primary if we can.
351+
if (mode === ReadPreference.SECONDARY_PREFERRED) {
352+
const primary = servers.filter(primaryFilter);
370353

371-
if (eligibleSecondaries.length)
372-
return latencyWindowReducer(topologyDescription, eligibleSecondaries);
354+
// unlike readPreference=primary, here we do filter for deprioritized servers.
355+
// if the primary is deprioritized, deprioritized secondaries take precedence.
356+
const eligiblePrimary = primary.filter(isDeprioritizedFactory(deprioritized));
357+
if (eligiblePrimary.length) return eligiblePrimary;
373358

374-
// we have no eligible secondaries, try for a primary.
375-
if (mode === ReadPreference.SECONDARY_PREFERRED) {
376-
const primary = servers.filter(primaryFilter);
377-
const eligiblePrimary = primary.filter(isDeprioritizedFactory(deprioritized));
359+
// we have no eligible primary nor secondaries that have not been deprioritized
360+
return secondaries.length
361+
? latencyWindowReducer(topologyDescription, secondaries)
362+
: primary;
363+
}
378364

379-
if (eligiblePrimary.length) return eligiblePrimary;
365+
// return all secondaries in the latency window.
366+
return latencyWindowReducer(topologyDescription, secondaries);
367+
}
380368

381-
// we have no eligible primary nor secondaries that have not been deprioritized
382-
return secondaries.length ? latencyWindowReducer(topologyDescription, secondaries) : primary;
369+
default: {
370+
const _exhaustiveCheck: never = mode;
371+
throw new MongoRuntimeError(
372+
`unexpected readPreference=${mode} (should never happen). Please report a bug in the Node driver Jira project.`
373+
);
383374
}
375+
}
376+
}
384377

385-
// return all secondaries in the latency window.
386-
return latencyWindowReducer(topologyDescription, secondaries);
378+
/**
379+
* Returns a function which selects servers based on a provided read preference
380+
*
381+
* @param readPreference - The read preference to select with
382+
*/
383+
export function readPreferenceServerSelector(readPreference: ReadPreference): ServerSelector {
384+
if (!readPreference.isValid()) {
385+
throw new MongoInvalidArgumentError('Invalid read preference specified');
386+
}
387+
388+
return function readPreferenceServers(
389+
topologyDescription: TopologyDescription,
390+
servers: ServerDescription[],
391+
deprioritized: DeprioritizedServers
392+
): ServerDescription[] {
393+
switch (topologyDescription.type) {
394+
case 'Single':
395+
return latencyWindowReducer(topologyDescription, servers.filter(knownFilter));
396+
case 'ReplicaSetNoPrimary':
397+
case 'ReplicaSetWithPrimary':
398+
return secondarySelector(readPreference, topologyDescription, servers, deprioritized);
399+
case 'Sharded': {
400+
const selectable = filterDeprioritized(servers, deprioritized);
401+
return latencyWindowReducer(topologyDescription, selectable.filter(knownFilter));
402+
}
403+
case 'Unknown':
404+
return [];
405+
case 'LoadBalanced':
406+
return servers.filter(loadBalancerFilter);
407+
default: {
408+
const _exhaustiveCheck: never = topologyDescription.type;
409+
throw new MongoRuntimeError(
410+
`unexpected topology type: ${topologyDescription.type} (this should never happen). Please file a bug in the Node driver Jira project.`
411+
);
412+
}
413+
}
387414
};
388415
}

test/unit/assorted/imports.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ function* walk(root) {
1515
}
1616
}
1717

18-
describe.skip('importing mongodb driver', () => {
18+
describe('importing mongodb driver', () => {
1919
const sourceFiles = walk(path.resolve(__dirname, '../../../src'));
2020

2121
for (const sourceFile of sourceFiles) {

0 commit comments

Comments
 (0)