Skip to content

Commit bd18638

Browse files
author
mezzeddine
committed
feat: add global OpenSearch index prefix support (#8489)
1 parent 4c2a3eb commit bd18638

File tree

7 files changed

+80
-12
lines changed

7 files changed

+80
-12
lines changed

docs/source/AdministratorGuide/ExternalsSupport/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ You can run your OpenSearch cluster without authentication, or using User name a
5555
- ``ca_certs`` (default:``None``)
5656
- ``client_key`` (default:``None``)
5757
- ``client_cert`` (default:``None``)
58+
- ``IndexPrefix`` (default:``''``). Prefix prepended to all DIRAC-created OpenSearch indexes. The prefix will be lower case only.
5859

5960

6061
to the location::

docs/source/AdministratorGuide/Systems/MonitoringSystem/index.rst

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,14 @@ For example::
5050
{
5151
User = test
5252
Password = password
53+
IndexPrefix = mydirac-
5354
}
5455
}
5556

5657

57-
The following option can be set in `Systems/Monitoring/Databases/MonitoringDB`:
58+
The following global option can be set in `Systems/NoSQLDatabases`:
5859

59-
*IndexPrefix*: Prefix used to prepend to indexes created in the ES instance.
60+
*IndexPrefix*: Prefix prepended to all indexes created in the OpenSearch instance.
6061

6162
For each monitoring types managed, the Period (how often a new index is created)
6263
can be defined with::

src/DIRAC/ConfigurationSystem/Client/Utilities.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,13 @@ def _getCACerts(cs_path):
532532
if ca_certs:
533533
parameters["ca_certs"] = ca_certs
534534

535+
# Global index prefix for all OpenSearch databases
536+
result = gConfig.getOption("/Systems/NoSQLDatabases/IndexPrefix")
537+
if result["OK"]:
538+
parameters["IndexPrefix"] = str(result["Value"]).strip().lower()
539+
else:
540+
parameters["IndexPrefix"] = ""
541+
535542
# Check optional parameters: Host, Port, SSL
536543
result = gConfig.getOption(cs_path + "/Host")
537544
if not result["OK"]:

src/DIRAC/Core/Base/ElasticDB.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,15 @@ def __init__(self, fullName, indexPrefix="", parentLogger=None):
3434
self.__ca_certs = dbParameters.get("ca_certs", None)
3535
self.__client_key = dbParameters.get("client_key", None)
3636
self.__client_cert = dbParameters.get("client_cert", None)
37+
self.__globalIndexPrefix = dbParameters.get("IndexPrefix", "")
3738

3839
super().__init__(
3940
host=self._dbHost,
4041
port=self._dbPort,
4142
user=self.__user,
4243
password=self.__dbPassword,
4344
indexPrefix=indexPrefix,
45+
globalIndexPrefix=self.__globalIndexPrefix,
4446
useSSL=self.__useSSL,
4547
useCRT=self.__useCRT,
4648
ca_certs=self.__ca_certs,

src/DIRAC/Core/Utilities/ElasticSearchDB.py

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ def __init__(
103103
user=None,
104104
password=None,
105105
indexPrefix="",
106+
globalIndexPrefix="",
106107
useSSL=True,
107108
useCRT=False,
108109
ca_certs=None,
@@ -117,6 +118,7 @@ def __init__(
117118
:param str user: user name to access the db
118119
:param str password: if the db is password protected we need to provide a password
119120
:param str indexPrefix: it is the indexPrefix used to get all indexes
121+
:param str globalIndexPrefix: prefix prepended to all index names and patterns
120122
:param bool useSSL: We can disable using secure connection. By default we use secure connection.
121123
:param bool useCRT: Use certificates.
122124
:param str ca_certs: CA certificates bundle.
@@ -125,6 +127,7 @@ def __init__(
125127
"""
126128

127129
self._connected = False
130+
self.globalIndexPrefix = globalIndexPrefix
128131
if user and password:
129132
sLog.debug("Specified username and password")
130133
password = urlparse.quote_plus(password)
@@ -191,6 +194,43 @@ def __init__(
191194
except ElasticConnectionError as e:
192195
sLog.error(repr(e))
193196

197+
@property
198+
def globalIndexPrefix(self) -> str:
199+
"""Global prefix prepended to all index names and patterns."""
200+
return self._globalIndexPrefix
201+
202+
@globalIndexPrefix.setter
203+
def globalIndexPrefix(self, value: str):
204+
self._globalIndexPrefix = (value or "").strip().lower()
205+
206+
def _withGlobalPrefix(self, indexName):
207+
"""Prepend the global index prefix to an index name or pattern."""
208+
if not self._globalIndexPrefix:
209+
return indexName
210+
211+
prefixedTokens = []
212+
for token in indexName.split(","):
213+
strippedToken = token.strip()
214+
if not strippedToken:
215+
prefixedTokens.append(strippedToken)
216+
continue
217+
218+
excluded = strippedToken.startswith("-")
219+
if excluded:
220+
strippedToken = strippedToken[1:]
221+
222+
if strippedToken == "_all":
223+
strippedToken = "*"
224+
225+
if not strippedToken.startswith(self._globalIndexPrefix):
226+
strippedToken = f"{self._globalIndexPrefix}{strippedToken}"
227+
228+
if excluded:
229+
strippedToken = f"-{strippedToken}"
230+
prefixedTokens.append(strippedToken)
231+
232+
return ",".join(prefixedTokens)
233+
194234
@ifConnected
195235
def addIndexTemplate(
196236
self, name: str, index_patterns: list, mapping: dict, priority: int = 1, settings: dict = None
@@ -204,6 +244,7 @@ def addIndexTemplate(
204244
"""
205245
if settings is None:
206246
settings = {"index": {"number_of_shards": 1, "number_of_replicas": 1}}
247+
index_patterns = [self._withGlobalPrefix(pattern) for pattern in index_patterns]
207248
body = {
208249
"index_patterns": index_patterns,
209250
"priority": priority,
@@ -225,6 +266,7 @@ def query(self, index: str, query):
225266
:param dict query: It is the query in OpenSearch DSL language
226267
227268
"""
269+
index = self._withGlobalPrefix(index)
228270
try:
229271
esDSLQueryResult = self.client.search(index=index, body=query)
230272
return S_OK(esDSLQueryResult)
@@ -247,6 +289,7 @@ def update(self, index: str, query=None, updateByQuery: bool = True, docID: str
247289
if not index or not query:
248290
return S_ERROR("Missing index or query")
249291

292+
index = self._withGlobalPrefix(index)
250293
try:
251294
if updateByQuery:
252295
esDSLQueryResult = self.client.update_by_query(index=index, body=query)
@@ -263,6 +306,7 @@ def getDoc(self, index: str, docID: str) -> dict:
263306
:param index: name of the index
264307
:param docID: document ID
265308
"""
309+
index = self._withGlobalPrefix(index)
266310
sLog.debug(f"Retrieving document {docID} in index {index}")
267311
try:
268312
return S_OK(self.client.get(index, docID)["_source"])
@@ -280,7 +324,7 @@ def getDocs(self, indexFunc, docIDs: list[str], vo: str) -> list[dict]:
280324
:param docIDs: document IDs
281325
"""
282326
sLog.debug(f"Retrieving documents {docIDs}")
283-
docs = [{"_index": indexFunc(docID, vo), "_id": docID} for docID in docIDs]
327+
docs = [{"_index": self._withGlobalPrefix(indexFunc(docID, vo)), "_id": docID} for docID in docIDs]
284328
try:
285329
response = self.client.mget({"docs": docs})
286330
except RequestError as re:
@@ -298,6 +342,7 @@ def updateDoc(self, index: str, docID: str, body) -> dict:
298342
:param body: The request definition requires either `script` or
299343
partial `doc`
300344
"""
345+
index = self._withGlobalPrefix(index)
301346
sLog.debug(f"Updating document {docID} in index {index}")
302347
try:
303348
self.client.update(index, docID, body)
@@ -317,6 +362,7 @@ def deleteDoc(self, index: str, docID: str):
317362
:param index: name of the index
318363
:param docID: document ID
319364
"""
365+
index = self._withGlobalPrefix(index)
320366
sLog.debug(f"Deleting document {docID} in index {index}")
321367
try:
322368
return S_OK(self.client.delete(index, docID))
@@ -333,6 +379,7 @@ def existsDoc(self, index: str, docID: str) -> bool:
333379
:param index: name of the index
334380
:param docID: document ID
335381
"""
382+
index = self._withGlobalPrefix(index)
336383
sLog.debug(f"Checking if document {docID} in index {index} exists")
337384
return self.client.exists(index, docID)
338385

@@ -341,6 +388,7 @@ def _Search(self, indexname):
341388
"""
342389
it returns the object which can be used for retreiving certain value from the DB
343390
"""
391+
indexname = self._withGlobalPrefix(indexname)
344392
return Search(using=self.client, index=indexname)
345393

346394
def _Q(self, name_or_query="match", **params):
@@ -363,6 +411,7 @@ def getIndexes(self, indexName=None):
363411
"""
364412
if not indexName:
365413
indexName = ""
414+
indexName = self._withGlobalPrefix(indexName)
366415
sLog.debug(f"Getting indices alias of {indexName}")
367416
# we only return indexes which belong to a specific prefix for example 'lhcb-production' or 'dirac-production etc.
368417
return list(self.client.indices.get_alias(f"{indexName}*"))
@@ -376,6 +425,7 @@ def getDocTypes(self, indexName):
376425
:return: S_OK or S_ERROR
377426
"""
378427
result = []
428+
indexName = self._withGlobalPrefix(indexName)
379429
try:
380430
sLog.debug("Getting mappings for ", indexName)
381431
result = self.client.indices.get_mapping(indexName)
@@ -407,6 +457,7 @@ def existingIndex(self, indexName):
407457
:param str indexName: the name of the index
408458
:returns: S_OK/S_ERROR if the request is successful
409459
"""
460+
indexName = self._withGlobalPrefix(indexName)
410461
sLog.debug(f"Checking existance of index {indexName}")
411462
try:
412463
return S_OK(self.client.indices.exists(indexName))
@@ -428,6 +479,7 @@ def createIndex(self, indexPrefix, mapping=None, period="day"):
428479
else:
429480
sLog.warn("The period is not provided, so using non-periodic indexes names")
430481
fullIndex = indexPrefix
482+
fullIndex = self._withGlobalPrefix(fullIndex)
431483

432484
try:
433485
if not mapping:
@@ -444,6 +496,7 @@ def deleteIndex(self, indexName):
444496
"""
445497
:param str indexName: the name of the index to be deleted...
446498
"""
499+
indexName = self._withGlobalPrefix(indexName)
447500
sLog.info("Deleting index", indexName)
448501
try:
449502
retVal = self.client.indices.delete(indexName)
@@ -474,6 +527,7 @@ def index(self, indexName, body=None, docID=None, op_type="index"):
474527
if not indexName or not body:
475528
return S_ERROR("Missing index or body")
476529

530+
indexName = self._withGlobalPrefix(indexName)
477531
try:
478532
res = self.client.index(index=indexName, body=body, id=docID, params={"op_type": op_type})
479533
except (RequestError, TransportError) as e:
@@ -505,8 +559,9 @@ def bulk_index(self, indexPrefix, data=None, mapping=None, period="day", withTim
505559
indexName = self.generateFullIndexName(indexPrefix, period)
506560
else:
507561
indexName = indexPrefix
508-
sLog.debug(f"Bulk indexing into {indexName} of {len(data)}")
562+
sLog.debug(f"Bulk indexing into {self._withGlobalPrefix(indexName)} of {len(data)}")
509563

564+
# Keep existence/creation checks on the raw name path; methods apply global prefix internally.
510565
res = self.existingIndex(indexName)
511566
if not res["OK"]:
512567
return res
@@ -515,6 +570,9 @@ def bulk_index(self, indexPrefix, data=None, mapping=None, period="day", withTim
515570
if not retVal["OK"]:
516571
return retVal
517572

573+
# Prefix exactly once for the direct bulk API call.
574+
indexName = self._withGlobalPrefix(indexName)
575+
518576
try:
519577
res = bulk(client=self.client, index=indexName, actions=generateDocs(data, withTimeStamp))
520578
except (BulkIndexError, RequestError) as e:
@@ -534,6 +592,7 @@ def getUniqueValue(self, indexName, key, orderBy=False):
534592
:param dict orderBy: it is a dictionary in case we want to order the result {key:'desc'} or {key:'asc'}
535593
:returns: a list of unique value for a certain key from the dictionary.
536594
"""
595+
indexName = self._withGlobalPrefix(indexName)
537596

538597
query = self._Search(indexName)
539598

@@ -592,6 +651,7 @@ def deleteByQuery(self, indexName, query):
592651
:param str indexName: the name of the index
593652
:param str query: the JSON-formatted query for which we want to issue the delete
594653
"""
654+
indexName = self._withGlobalPrefix(indexName)
595655
try:
596656
self.client.delete_by_query(index=indexName, body=query)
597657
except Exception as inst:

src/DIRAC/Core/scripts/install_full.cfg

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,5 +142,7 @@ LocalInstallation
142142
Password = <password>
143143
Host = <host>
144144
Port = <port>
145+
# Optional global prefix prepended to all DIRAC-created OpenSearch indexes
146+
# IndexPrefix = <prefix>
145147
}
146148
}

src/DIRAC/MonitoringSystem/DB/MonitoringDB.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,8 @@
33
44
**Configuration Parameters**:
55
6-
The following option can be set in `Systems/Monitoring/Databases/MonitoringDB`
7-
8-
* *IndexPrefix*: Prefix used to prepend to indexes created in the OpenSearch instance.
6+
The global OpenSearch index prefix can be set in
7+
`Systems/NoSQLDatabases/IndexPrefix`.
98
109
For each monitoring types managed, the Period (how often a new index is created)
1110
can be defined with::
@@ -31,8 +30,6 @@
3130
import time
3231

3332
from DIRAC import S_ERROR, S_OK
34-
from DIRAC.ConfigurationSystem.Client.Config import gConfig
35-
from DIRAC.ConfigurationSystem.Client.PathFinder import getDatabaseSection
3633
from DIRAC.Core.Base.ElasticDB import ElasticDB
3734
from DIRAC.Core.Utilities.Plotting.TypeLoader import TypeLoader
3835

@@ -45,10 +42,8 @@ def __init__(self, name="Monitoring/MonitoringDB"):
4542
"""Standard constructor"""
4643

4744
try:
48-
section = getDatabaseSection("Monitoring/MonitoringDB")
49-
indexPrefix = gConfig.getValue(f"{section}/IndexPrefix", "").lower()
5045
# Connecting to the ES cluster
51-
super().__init__(fullName=name, indexPrefix=indexPrefix)
46+
super().__init__(fullName=name)
5247
except RuntimeError as ex:
5348
self.log.error("Can't connect to MonitoringDB", repr(ex))
5449
raise ex

0 commit comments

Comments
 (0)