Skip to content

Commit 48da361

Browse files
author
mezzeddine
committed
refactor: address review feedback for global OpenSearch index prefix
1 parent aacb41a commit 48da361

File tree

2 files changed

+16
-10
lines changed

2 files changed

+16
-10
lines changed

docs/source/AdministratorGuide/ExternalsSupport/index.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ You can run your OpenSearch cluster without authentication, or using User name a
5555
- ``ca_certs`` (default:``None``)
5656
- ``client_key`` (default:``None``)
5757
- ``client_cert`` (default:``None``)
58-
- ``IndexPrefix`` (default:``''``). Prefix prepended to all DIRAC-created OpenSearch indexes.
58+
- ``IndexPrefix`` (default:``''``). Prefix prepended to all DIRAC-created OpenSearch indexes. The prefix will be lower case only.
5959

6060

6161
to the location::

src/DIRAC/Core/Utilities/ElasticSearchDB.py

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ def __init__(
127127
"""
128128

129129
self._connected = False
130-
self._globalIndexPrefix = (globalIndexPrefix or "").strip().lower()
130+
self.globalIndexPrefix = globalIndexPrefix
131131
if user and password:
132132
sLog.debug("Specified username and password")
133133
password = urlparse.quote_plus(password)
@@ -194,17 +194,20 @@ def __init__(
194194
except ElasticConnectionError as e:
195195
sLog.error(repr(e))
196196

197+
@property
198+
def globalIndexPrefix(self) -> str:
199+
"""Global prefix prepended to all index names and patterns."""
200+
return self._globalIndexPrefix
201+
202+
@globalIndexPrefix.setter
203+
def globalIndexPrefix(self, value: str):
204+
self._globalIndexPrefix = (value or "").strip().lower()
205+
197206
def _withGlobalPrefix(self, indexName):
198207
"""Prepend the global index prefix to an index name or pattern."""
199208
if not self._globalIndexPrefix:
200209
return indexName
201210

202-
if isinstance(indexName, (list, tuple)):
203-
return [self._withGlobalPrefix(name) for name in indexName]
204-
205-
if not isinstance(indexName, str):
206-
return indexName
207-
208211
prefixedTokens = []
209212
for token in indexName.split(","):
210213
strippedToken = token.strip()
@@ -556,9 +559,9 @@ def bulk_index(self, indexPrefix, data=None, mapping=None, period="day", withTim
556559
indexName = self.generateFullIndexName(indexPrefix, period)
557560
else:
558561
indexName = indexPrefix
559-
indexName = self._withGlobalPrefix(indexName)
560-
sLog.debug(f"Bulk indexing into {indexName} of {len(data)}")
562+
sLog.debug(f"Bulk indexing into {self._withGlobalPrefix(indexName)} of {len(data)}")
561563

564+
# Keep existence/creation checks on the raw name path; methods apply global prefix internally.
562565
res = self.existingIndex(indexName)
563566
if not res["OK"]:
564567
return res
@@ -567,6 +570,9 @@ def bulk_index(self, indexPrefix, data=None, mapping=None, period="day", withTim
567570
if not retVal["OK"]:
568571
return retVal
569572

573+
# Prefix exactly once for the direct bulk API call.
574+
indexName = self._withGlobalPrefix(indexName)
575+
570576
try:
571577
res = bulk(client=self.client, index=indexName, actions=generateDocs(data, withTimeStamp))
572578
except (BulkIndexError, RequestError) as e:

0 commit comments

Comments
 (0)