From: Simon Glass <simon.glass@canonical.com> It is possible to use an LSP to determine which code is used, at least to some degree. Make a start on this, in the hope that future work may prove out the concept. So far I have not found this to be particularly useful, since it does not seem to handle IS_ENABLED() and similar macros when working out inactive regions. Co-developed-by: Claude <noreply@anthropic.com> Signed-off-by: Simon Glass <simon.glass@canonical.com> --- tools/codman/lsp.py | 319 +++++++++++++++++++++++++++++++++++++ tools/codman/lsp_client.py | 225 ++++++++++++++++++++++++++ tools/codman/test_lsp.py | 153 ++++++++++++++++++ 3 files changed, 697 insertions(+) create mode 100644 tools/codman/lsp.py create mode 100644 tools/codman/lsp_client.py create mode 100755 tools/codman/test_lsp.py diff --git a/tools/codman/lsp.py b/tools/codman/lsp.py new file mode 100644 index 00000000000..143fe22a7e1 --- /dev/null +++ b/tools/codman/lsp.py @@ -0,0 +1,319 @@ +# SPDX-License-Identifier: GPL-2.0 +# +# Copyright 2025 Canonical Ltd +# +"""LSP-based line-level analysis for source code. + +This module provides functionality to analyse which lines in source files +are active vs inactive based on preprocessor conditionals, using clangd's +inactive regions feature via the Language Server Protocol (LSP). +""" + +import concurrent.futures +import json +import multiprocessing +import os +import re +import tempfile +import time + +from u_boot_pylib import tools, tout +from analyser import Analyser, FileResult +from lsp_client import LspClient + + +def create_compile_commands(build_dir, srcdir): + """Create compile_commands.json using gen_compile_commands.py. + + Args: + build_dir (str): Build directory path + srcdir (str): Source directory path + + Returns: + list: List of compile command entries + """ + # Use the same pattern as gen_compile_commands.py + line_pattern = re.compile( + r'^(saved)?cmd_[^ ]*\.o := (?P<command_prefix>.* )' + r'(?P<file_path>[^ ]*\.[cS]) *(;|$)') + + compile_commands = [] + + # Walk through build directory looking for .cmd files + filename_matcher = re.compile(r'^\..*\.cmd$') + exclude_dirs = ['.git', 'Documentation', 'include', 'tools'] + + for dirpath, dirnames, filenames in os.walk(build_dir, topdown=True): + # Prune unwanted directories + dirnames = [d for d in dirnames if d not in exclude_dirs] + + for filename in filenames: + if not filename_matcher.match(filename): + continue + + cmd_file = os.path.join(dirpath, filename) + try: + with open(cmd_file, 'rt', encoding='utf-8') as f: + result = line_pattern.match(f.readline()) + if result: + command_prefix = result.group('command_prefix') + file_path = result.group('file_path') + + # Clean up command prefix (handle escaped #) + prefix = command_prefix.replace(r'\#', '#').replace( + '$(pound)', '#') + + # Get absolute path to source file + abs_path = os.path.realpath( + os.path.join(srcdir, file_path)) + if os.path.exists(abs_path): + compile_commands.append({ + 'directory': srcdir, + 'file': abs_path, + 'command': prefix + file_path, + }) + except (OSError, IOError): + continue + + return compile_commands + + +def worker(args): + """Analyse a single source file using clangd LSP. + + Args: + args (tuple): Tuple of (source_file, client) + where client is a shared LspClient instance + + Returns: + tuple: (source_file, inactive_regions, error_msg) + """ + source_file, client = args + + try: + # Read file content + content = tools.read_file(source_file, binary=False) + + # Open the document + client.notify('textDocument/didOpen', { + 'textDocument': { + 'uri': f'file://{source_file}', + 'languageId': 'c', + 'version': 1, + 'text': content + } + }) + + # Wait for clangd to process and send notifications + # Poll for inactive regions notification for this specific file + max_wait = 10 # seconds + start_time = time.time() + inactive_regions = None + + while time.time() - start_time < max_wait: + time.sleep(0.1) + + with client.lock: + notifications = list(client.notifications) + # Clear processed notifications to avoid buildup + client.notifications = [] + + for notif in notifications: + method = notif.get('method', '') + if method == 'textDocument/clangd.inactiveRegions': + params = notif.get('params', {}) + uri = params.get('uri', '') + # Check if this notification is for our file + if uri == f'file://{source_file}': + inactive_regions = params.get('inactiveRegions', []) + break + + if inactive_regions is not None: + break + + # Close the document to free resources + client.notify('textDocument/didClose', { + 'textDocument': { + 'uri': f'file://{source_file}' + } + }) + + if inactive_regions is None: + # No inactive regions notification received + # This could mean the file has no inactive code + inactive_regions = [] + + return (source_file, inactive_regions, None) + + except Exception as e: + return (source_file, None, str(e)) + + +class LspAnalyser(Analyser): # pylint: disable=too-few-public-methods + """Analyser that uses clangd LSP to determine active lines. + + This analyser uses the Language Server Protocol (LSP) with clangd to + identify inactive preprocessor regions in source files. + """ + + def __init__(self, build_dir, srcdir, used_sources, keep_temps=False): + """Set up the LSP analyser. + + Args: + build_dir (str): Build directory containing .o and .cmd files + srcdir (str): Path to source root directory + used_sources (set): Set of source files that are compiled + keep_temps (bool): If True, keep temporary files for debugging + """ + super().__init__(srcdir, keep_temps) + self.build_dir = build_dir + self.used_sources = used_sources + + def extract_inactive_regions(self, jobs=None): + """Extract inactive regions from source files using clangd. + + Args: + jobs (int): Number of parallel jobs (None = use all CPUs) + + Returns: + dict: Mapping of source file paths to lists of inactive regions + """ + # Create compile commands database + tout.progress('Building compile commands database...') + compile_commands = create_compile_commands(self.build_dir, self.srcdir) + + # Filter to only .c and .S files that we need to analyse + filtered_files = [] + for cmd in compile_commands: + source_file = cmd['file'] + if source_file in self.used_sources: + if source_file.endswith('.c') or source_file.endswith('.S'): + filtered_files.append(source_file) + + tout.progress(f'Found {len(filtered_files)} source files to analyse') + + if not filtered_files: + return {} + + inactive = {} + errors = [] + + # Create a single clangd instance and use it for all files + with tempfile.TemporaryDirectory() as tmpdir: + # Write compile commands database + compile_db = os.path.join(tmpdir, 'compile_commands.json') + with open(compile_db, 'w', encoding='utf-8') as f: + json.dump(compile_commands, f) + + # Start a single clangd server + tout.progress('Starting clangd server...') + with LspClient(['clangd', '--log=error', + f'--compile-commands-dir={tmpdir}']) as client: + result = client.init(f'file://{self.srcdir}') + if not result: + tout.error('Failed to start clangd') + return {} + + # Determine number of workers + if jobs is None: + jobs = min(multiprocessing.cpu_count(), len(filtered_files)) + elif jobs <= 0: + jobs = 1 + + tout.progress(f'Processing files with {jobs} workers...') + + # Use ThreadPoolExecutor to process files in parallel + # (threads share the same clangd client) + with concurrent.futures.ThreadPoolExecutor( + max_workers=jobs) as executor: + # Submit all tasks + future_to_file = { + executor.submit(worker, (source_file, client)): + source_file + for source_file in filtered_files + } + + # Collect results as they complete + completed = 0 + for future in concurrent.futures.as_completed(future_to_file): + source_file = future_to_file[future] + completed += 1 + tout.progress( + f'Processing {completed}/{len(filtered_files)}: ' + + f'{os.path.basename(source_file)}...') + + try: + source_file_result, inactive_regions, error_msg = ( + future.result()) + + if error_msg: + errors.append(f'{source_file}: {error_msg}') + elif inactive_regions is not None: + inactive[source_file_result] = ( + inactive_regions) + except Exception as exc: + errors.append(f'{source_file}: {exc}') + + # Report any errors + if errors: + for error in errors[:10]: # Show first 10 errors + tout.error(error) + if len(errors) > 10: + tout.error(f'... and {len(errors) - 10} more errors') + tout.warning(f'Failed to analyse {len(errors)} file(s) with LSP') + + return inactive + + def process(self, jobs=None): + """Perform line-level analysis using clangd LSP. + + Args: + jobs (int): Number of parallel jobs (None = use all CPUs) + + Returns: + dict: Mapping of source file paths to FileResult named tuples + """ + tout.progress('Extracting inactive regions using clangd LSP...') + inactive_regions_map = self.extract_inactive_regions(jobs) + + file_results = {} + for source_file in self.used_sources: + # Only process .c and .S files + if not (source_file.endswith('.c') or source_file.endswith('.S')): + continue + + abs_path = os.path.realpath(source_file) + inactive_regions = inactive_regions_map.get(abs_path, []) + + # Count total lines in the file + total_lines = self.count_lines(abs_path) + + # Create line status dict + line_status = {} + # Set up all lines as active + for i in range(1, total_lines + 1): + line_status[i] = 'active' + + # Mark inactive lines based on regions + # LSP uses 0-indexed line numbers + for region in inactive_regions: + start_line = region['start']['line'] + 1 + end_line = region['end']['line'] + 1 + # Mark lines as inactive (inclusive range) + for line_num in range(start_line, end_line + 1): + if line_num <= total_lines: + line_status[line_num] = 'inactive' + + inactive_lines = len([s for s in line_status.values() + if s == 'inactive']) + active_lines = total_lines - inactive_lines + + file_results[abs_path] = FileResult( + total_lines=total_lines, + active_lines=active_lines, + inactive_lines=inactive_lines, + line_status=line_status + ) + + tout.info(f'Analysed {len(file_results)} files using clangd LSP') + return file_results diff --git a/tools/codman/lsp_client.py b/tools/codman/lsp_client.py new file mode 100644 index 00000000000..954879a651e --- /dev/null +++ b/tools/codman/lsp_client.py @@ -0,0 +1,225 @@ +# SPDX-License-Identifier: GPL-2.0 +# +# Copyright 2025 Canonical Ltd +# +"""Minimal LSP (Language Server Protocol) client for clangd. + +This module provides a simple JSON-RPC 2.0 client for communicating with +LSP servers like clangd. It focuses on the specific functionality needed +for analyzing inactive preprocessor regions. +""" + +import json +import subprocess +import threading +from typing import Any, Dict, Optional + + +class LspClient: + """Minimal LSP client for JSON-RPC 2.0 communication. + + This client handles the basic LSP protocol communication over + stdin/stdout with a language server process. + + Attributes: + process: The language server subprocess + next_id: Counter for JSON-RPC request IDs + responses: Dict mapping request IDs to response data + lock: Thread lock for response dictionary + reader_thread: Background thread reading server responses + """ + + def __init__(self, server_command): + """Init the LSP client and start the server. + + Args: + server_command (list): Command to start the LSP server + (e.g., ['clangd', '--log=error']) + """ + self.process = subprocess.Popen( + server_command, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=0 + ) + self.next_id = 1 + self.responses = {} + self.notifications = [] + self.lock = threading.Lock() + self.running = True + + # Start background thread to read responses + self.reader_thread = threading.Thread(target=self._read_responses) + self.reader_thread.daemon = True + self.reader_thread.start() + + def _read_responses(self): + """Background thread to read responses from the server""" + while self.running and self.process.poll() is None: + try: + # Read headers + headers = {} + while True: + line = self.process.stdout.readline() + if not line or line == '\r\n' or line == '\n': + break + if ':' in line: + key, value = line.split(':', 1) + headers[key.strip()] = value.strip() + + if 'Content-Length' not in headers: + continue + + # Read content + content_length = int(headers['Content-Length']) + content = self.process.stdout.read(content_length) + + if not content: + break + + # Parse JSON + message = json.loads(content) + + # Store response or notification + with self.lock: + if 'id' in message: + # Response to a request + self.responses[message['id']] = message + else: + # Notification from server + self.notifications.append(message) + + except (json.JSONDecodeError, ValueError): + continue + except Exception: + break + + def _send_message(self, message: Dict[str, Any]): + """Send a JSON-RPC message to the server. + + Args: + message: JSON-RPC message dictionary + """ + content = json.dumps(message) + headers = f'Content-Length: {len(content)}\r\n\r\n' + self.process.stdin.write(headers + content) + self.process.stdin.flush() + + def request(self, method: str, params: Optional[Dict] = None, + timeout: int = 30) -> Optional[Dict]: + """Send a JSON-RPC request and wait for response. + + Args: + method: LSP method name (e.g., 'initialize') + params: Method parameters dictionary + timeout: Timeout in seconds (default: 30) + + Returns: + Response dictionary, or None on timeout/error + """ + request_id = self.next_id + self.next_id += 1 + + message = { + 'jsonrpc': '2.0', + 'id': request_id, + 'method': method, + } + if params: + message['params'] = params + + self._send_message(message) + + # Wait for response + import time + start_time = time.time() + while time.time() - start_time < timeout: + with self.lock: + if request_id in self.responses: + response = self.responses.pop(request_id) + if 'result' in response: + return response['result'] + if 'error' in response: + raise RuntimeError( + f"LSP error: {response['error']}") + return response + time.sleep(0.01) + + return None + + def notify(self, method: str, params: Optional[Dict] = None): + """Send a JSON-RPC notification (no response expected). + + Args: + method: LSP method name + params: Method parameters dictionary + """ + message = { + 'jsonrpc': '2.0', + 'method': method, + } + if params: + message['params'] = params + + self._send_message(message) + + def init(self, root_uri: str, capabilities: Optional[Dict] = None) -> Dict: + """Send initialize request to the server. + + Args: + root_uri: Workspace root URI (e.g., 'file:///path/to/workspace') + capabilities: Client capabilities dict + + Returns: + Server capabilities from initialize response + """ + if capabilities is None: + capabilities = { + 'textDocument': { + 'semanticTokens': { + 'requests': { + 'full': True + } + }, + 'publishDiagnostics': {}, + 'inactiveRegions': { + 'refreshSupport': False + } + } + } + + result = self.request('initialize', { + 'processId': None, + 'rootUri': root_uri, + 'capabilities': capabilities + }) + + # Send initialized notification + self.notify('initialized', {}) + + return result + + def shutdown(self): + """Shutdown the language server""" + self.request('shutdown') + self.notify('exit') + self.running = False + if self.process: + self.process.wait(timeout=5) + # Close file descriptors to avoid ResourceWarnings + if self.process.stdin: + self.process.stdin.close() + if self.process.stdout: + self.process.stdout.close() + if self.process.stderr: + self.process.stderr.close() + + def __enter__(self): + """Context manager entry""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit - ensure cleanup""" + self.shutdown() diff --git a/tools/codman/test_lsp.py b/tools/codman/test_lsp.py new file mode 100755 index 00000000000..1070ce655fb --- /dev/null +++ b/tools/codman/test_lsp.py @@ -0,0 +1,153 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: GPL-2.0+ +# +# Copyright 2025 Canonical Ltd +# +"""Test script for LSP client with clangd""" + +import json +import os +import sys +import tempfile +import time + +# Add parent directory to path +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from lsp_client import LspClient # pylint: disable=wrong-import-position + + +def test_clangd(): + """Test basic clangd functionality""" + # Create a temporary directory with a simple C file + with tempfile.TemporaryDirectory() as tmpdir: + # Create a C file with CONFIG-style inactive code + test_file = os.path.join(tmpdir, 'test.c') + with open(test_file, 'w', encoding='utf-8') as f: + f.write('''#include <stdio.h> + +// Simulate U-Boot style CONFIG options +#define CONFIG_FEATURE_A 1 + +void always_compiled(void) +{ + printf("Always here\\n"); +} + +#ifdef CONFIG_FEATURE_A +void feature_a_code(void) +{ + printf("Feature A enabled\\n"); +} +#endif + +#ifdef CONFIG_FEATURE_B +void feature_b_code(void) +{ + printf("Feature B enabled (THIS SHOULD BE INACTIVE)\\n"); +} +#endif + +#if 0 +void disabled_debug_code(void) +{ + printf("Debug code (INACTIVE)\\n"); +} +#endif +''') + + # Create compile_commands.json + compile_commands = [ + { + 'directory': tmpdir, + 'command': f'gcc -c {test_file}', + 'file': test_file + } + ] + compile_db = os.path.join(tmpdir, 'compile_commands.json') + with open(compile_db, 'w', encoding='utf-8') as f: + json.dump(compile_commands, f) + + # Create .clangd config to enable inactive regions + clangd_config = os.path.join(tmpdir, '.clangd') + with open(clangd_config, 'w', encoding='utf-8') as f: + f.write('''InactiveRegions: + Opacity: 0.55 +''') + + print(f'Created test file: {test_file}') + print(f'Created compile DB: {compile_db}') + print(f'Created clangd config: {clangd_config}') + + # Start clangd + print('\\nStarting clangd...') + with LspClient(['clangd', '--log=error', + f'--compile-commands-dir={tmpdir}']) as client: + print('Initialising...') + result = client.init(f'file://{tmpdir}') + print(f'Server capabilities: {result.get("capabilities", {}).keys()}') + + # Open the document + print(f'\\nOpening document: {test_file}') + with open(test_file, 'r', encoding='utf-8') as f: + content = f.read() + + client.notify('textDocument/didOpen', { + 'textDocument': { + 'uri': f'file://{test_file}', + 'languageId': 'c', + 'version': 1, + 'text': content + } + }) + + # Wait for clangd to index the file + print('\\nWaiting for clangd to index file...') + time.sleep(3) + + # Check for inactive regions notification + print('\\nChecking for inactive regions notification...') + with client.lock: + notifications = list(client.notifications) + + print(f'Received {len(notifications)} notifications:') + inactive_regions = None + for notif in notifications: + method = notif.get('method', 'unknown') + print(f' - {method}') + + # Look for the clangd inactive regions extension + if method == 'textDocument/clangd.inactiveRegions': + params = notif.get('params', {}) + inactive_regions = params.get('inactiveRegions', []) + print(f' Found {len(inactive_regions)} inactive regions!') + + if inactive_regions: + print('\\nInactive regions:') + for region in inactive_regions: + start = region['start'] + end = region['end'] + start_line = start['line'] + 1 # LSP is 0-indexed + end_line = end['line'] + 1 + print(f' Lines {start_line}-{end_line}') + else: + print('\\nNo inactive regions received (feature may not be enabled)') + + # Also show the file with line numbers for reference + print('\\nFile contents:') + for i, line in enumerate(content.split('\\n'), 1): + print(f'{i:3}: {line}') + + print('\\nTest completed!') + + # Check clangd stderr for any errors + print('\\n=== Clangd stderr output ===') + stderr_output = client.process.stderr.read() + if stderr_output: + print(stderr_output[:1000]) + else: + print('(no stderr output)') + + +if __name__ == '__main__': + test_clangd() -- 2.43.0