Implementing log aggregation
For a conceptual overview of logging aggregation, consult the logging concept page.
Overview
Approach
The Stackable Data Platform (SDP) does *not* implement product logging where none exists, but rather provides a way for the user to interact with whatever logging framework is already in place, and to ensure that the log output can be consolidated and preserved in an output sink (e.g. Opensearch). Products use various logging libraries such as Log4j, Log4j2, Logback, Python logging etc. and SDP logging aggregation enables:
-
the extension or overriding of the default product logging configuration (so that e.g. the log level can be changed): this assumes knowledge of the product logging library
-
the capture and parsing of stdout and stderr logging statements
-
the consolidation of both of the above into a standard logging format which will be written to the aggregation sink
Implementing logging for a given operator and product will require code:
-
in a logging module specific to the product
-
in the
operator-rs
framework library -
in the product controller
These will be described in the next section.
Implementation details
Logging module
Most products will have a logging module (e.g. product-logging.rs
) that contains two functions:
resolve_vector_aggregator_address
pub async fn resolve_vector_aggregator_address(
client: &Client,
... // other parameters dependent on calling context
) -> Result<Option<String>> {
This function will typically be called early in the reconcile step to determine whether a Vector ConfigMap
has been mounted and, if so, under which address the Vector process is running.
extend_config_map (or similar)
pub fn extend_config_map<C, K>(
role_group: &RoleGroupRef<K>,
vector_aggregator_address: Option<&str>,
logging: &Logging<C>,
... // other parameters dependent on calling context
cm_builder: &mut ConfigMapBuilder,
) -> Result<()> {
This function will typically be called from the product controller at the point at which the RoleGroup
ConfigMap
is being defined. It issues internal calls to the framework to conditionally retrieve:
-
a default logging configuration for the given product logging library e.g.
product_logging::framework::create_log4j2_config
-
a Vector configuration called from
product_logging::framework::create_vector_config
For the first of these, the framework provides pre-configured configurations for several familiar libraries. These can be overridden by providing a specific logging configuration in a ConfigMap
directly in the custom resource. The second one is set by a boolean flag. Both these are shown in the snippet below:
config:
logging:
enableVectorAgent: true # deploy a vector container with the log agent and mount the vector.toml config
containers:
spark-history:
custom:
configMap: spark-history-log-config # containing e.g. a log4j configuration
If a specific logging configuration is not yet available in the framework this can be added directly in product_logging.rs
. For instance, a very simple local Java Logging configuration could be defined like this:
fn create_java_logging_config(
) -> String {
format!(r#"
handlers=java.util.logging.FileHandler, java.util.logging.ConsoleHandler
.level={root_log_level}
java.util.logging.FileHandler.pattern=/path/to/jul.%g.logger
java.util.logging.FileHandler.limit=50000
java.util.logging.FileHandler.count=10
java.util.logging.FileHandler.formatter=java.util.logging.SimpleFormatter
java.util.logging.ConsoleHandler.level={console_log_level}
java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
"#,
root_log_level = "INFO",
console_log_level = "INFO"
)
}
Framework (operator-rs
)
As mentioned above, the framework provides pre-defined logging configurations for standard logging libraries and Vector itself. It also provides a way of capturing standard shell output and writing it to two files (container.stdout.log
and container.stderr.log
). This is done by calling the function capture_shell_output
.
vector.toml
The information for this file is retrieved by calling product_logging::framework::create_vector_config
, which returns the file contents. Each [sources.*]
section declares a logging type or source e.g.
[sources.files_stdout]
type = "file"
include = ["{STACKABLE_LOG_DIR}/*/*.stdout.log"]
This files_stdout
source can be referenced in a [transforms.*]
section, several of which can be chained together. A simple example shown below takes JUL logging output, sets the log level, extracts the container and file names and adds a prefix to each message.
[sources.files_jul]
type = "file"
include = ["{STACKABLE_LOG_DIR}/*/*.logger"]
[transforms.processed_files_jul]
inputs = ["files_jul"]
type = "remap"
source = '''
.logger = "ROOT"
.level = "INFO"
'''
[transforms.parsed_logs_jul]
inputs = ["processed_files_jul"]
type = "remap"
source = '''
. |= parse_regex!(.file, r'^{STACKABLE_LOG_DIR}/(?P<container>.*?)/(?P<file>.*?)$')
del(.source_type)
'''
[transforms.extended_logs_jul]
inputs = ["parsed_logs_jul"]
type = "remap"
source = '''
.message = "Java Logging: " + string!(.message)
'''
Product Controller
How do all these parts fit together? Let’s look at where they are applied in the product controller.
-
The vector aggregator address is retrieved early on in the reconcile function, where a
client
object is available:
let vector_aggregator_address = resolve_vector_aggregator_address(&cluster, client)
.await
.context(ResolveVectorAggregatorAddressSnafu)?;
}
-
It is then passed through to functions where config maps are created at role-group level, and where
extend_config_map
(orextend_role_group_config_map
in the example below) is called:
extend_role_group_config_map(
rolegroup,
vector_aggregator_address,
&merged_config.logging,
&mut cm_builder,
)
.context(InvalidLoggingConfigSnafu {
cm_name: rolegroup.object_name(),
})?;
This can be done at multiple places, as is the case for the spark-k8s-operator, where config maps are defined for the driver and executor pod-templates, as well as for the spark-submit Job.
-
If shell capture is required, this is done for each relevant container in the role-group StatefulSet.
capture_shell_output
returns a command that should normally be the first component of a container’s command arguments:
if let Some(ContainerLogConfig {
choice: Some(ContainerLogConfigChoice::Automatic(log_config)),
}) = merged_config.logging.containers.get(&Container::Connector)
{
args.push(product_logging::framework::capture_shell_output(
STACKABLE_LOG_DIR,
"edc",
log_config,
));
}