| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243 |
- #!/usr/bin/env python3
- """
- Generate DBeaver data-sources.json from sdm status --json output
- Organizes PostgreSQL connections by customer folders
- Excludes readonly connections, includes activate connections
- """
- import subprocess
- import json
- import re
- import uuid
- import os
- from collections import defaultdict
- def run_sdm_status():
- try:
- result = subprocess.run(['sdm', 'status', '--json'], capture_output=True, text=True)
- if result.returncode != 0:
- print(f"Error running sdm status: {result.stderr}")
- return None
- return json.loads(result.stdout)
- except FileNotFoundError:
- print("Error: sdm command not found")
- return None
- except json.JSONDecodeError as e:
- print(f"Error parsing JSON: {e}")
- return None
- def parse_postgres_connections(sdm_data):
- connections = []
-
- for item in sdm_data:
- if item.get('type') == 'postgres':
- name = item.get('name', '')
- address = item.get('address', '')
-
- if 'readonly' in name:
- continue
-
- if ':' in address:
- addr, port = address.split(':')
- port = int(port)
- else:
- continue
-
- env_info = parse_connection_name(name)
- if env_info:
- connections.append({
- 'name': name,
- 'addr': addr,
- 'port': port,
- 'environment': env_info['environment'],
- 'customer': env_info['customer'],
- 'stage': env_info['stage']
- })
-
- return connections
- def parse_connection_name(name):
- pattern = r'oc-([^-]+)-([^-]+)-.*'
- match = re.match(pattern, name)
-
- if not match:
- return None
-
- environment = match.group(1)
- customer = match.group(2)
-
- if environment in ['dev', 'nextrc', 'nextrc2']:
- stage = 'internal'
- elif environment in ['stage', 'stage2', 'uat']:
- stage = 'stage'
- elif environment in ['prod', 'prod2']:
- stage = 'production'
- else:
- stage = 'unknown'
-
- return {
- 'environment': environment,
- 'customer': customer,
- 'stage': stage
- }
- def create_dbeaver_connection(conn, connection_id):
- if conn['customer'] == 'internal' or conn['customer'].startswith('feature'):
- folder = 'internal'
- elif conn['stage'] == 'stage':
- folder = conn['customer']
- elif conn['stage'] == 'production':
- folder = conn['customer']
- else:
- folder = 'other'
-
- display_name = conn['name'].replace('oc-', '').replace('-rds', '').replace('-activate', '')
-
- if 'activate' in conn['name']:
- display_name = f"activate-{display_name}"
-
- if 'activate' in conn['name']:
- default_db = "postgres"
- db_url = f"jdbc:postgresql://{conn['addr']}:{conn['port']}/postgres"
- else:
- default_db = "member_dossier"
- db_url = f"jdbc:postgresql://{conn['addr']}:{conn['port']}/member_dossier"
-
- return {
- "provider": "postgresql",
- "driver": "postgres-jdbc",
- "name": display_name,
- "save-password": True,
- "folder": folder,
- "configuration": {
- "host": conn['addr'],
- "port": str(conn['port']),
- "database": default_db,
- "url": db_url,
- "configurationType": "MANUAL",
- "home": "/bin",
- "type": conn['environment'],
- "closeIdleConnection": False,
- "provider-properties": {
- "@dbeaver-show-non-default-db@": "true",
- "@dbeaver-show-template-db@": "true",
- "@dbeaver-show-unavailable-db@": "true",
- "show-database-statistics": "false",
- "@dbeaver-read-all-data-types-db@": "false",
- "@dbeaver-use-prepared-statements-db@": "false",
- "postgresql.dd.plain.string": "false",
- "postgresql.dd.tag.string": "false"
- },
- "auth-model": "native"
- }
- }
- def group_connections(connections):
- folders = defaultdict(list)
-
- for conn in connections:
- if conn['customer'] == 'internal' or conn['customer'].startswith('feature'):
- folder = 'internal'
- elif conn['stage'] == 'stage':
- folder = conn['customer']
- elif conn['stage'] == 'production':
- folder = conn['customer']
- else:
- folder = 'other'
-
- folders[folder].append(conn)
-
- return folders
- def is_sdm_connection(conn_name):
- """Check if connection name matches SDM pattern (oc-*-*-*)"""
- pattern = r'oc-([^-]+)-([^-]+)-.*'
- return re.match(pattern, conn_name) is not None
- def load_existing_config(output_file):
- """Load existing DBeaver config, return empty if file doesn't exist"""
- if not os.path.exists(output_file):
- return {"folders": {}, "connections": {}}
-
- try:
- with open(output_file, 'r') as f:
- return json.load(f)
- except json.JSONDecodeError:
- print("⚠️ Existing config file is invalid, starting fresh")
- return {"folders": {}, "connections": {}}
- def generate_dbeaver_config(connections, existing_config):
- folders = group_connections(connections)
-
- # Start with existing folders
- folders_config = existing_config.get("folders", {}).copy()
-
- # Add folders for new SDM connections
- for folder_name in folders.keys():
- if folder_name not in folders_config:
- folders_config[folder_name] = {}
-
- # Start with existing connections, filtering out SDM connections
- connections_config = {}
- existing_connections = existing_config.get("connections", {})
- for conn_id, conn in existing_connections.items():
- # Keep non-SDM connections (those that don't start with oc- in their display name)
- display_name = conn.get("name", "")
- # Also check configuration host/type for SDM patterns
- config = conn.get("configuration", {})
- conn_type = config.get("type", "")
-
- # Only keep connections that don't match SDM pattern
- if not is_sdm_connection(display_name) and not is_sdm_connection(conn_type):
- connections_config[conn_id] = conn
-
- # Add new SDM connections
- for conn in connections:
- connection_id = f"postgres-jdbc-{uuid.uuid4().hex[:8]}-{uuid.uuid4().hex[:8]}"
- conn_config = create_dbeaver_connection(conn, connection_id)
- connections_config[connection_id] = conn_config
-
- return {
- "folders": folders_config,
- "connections": connections_config
- }
- def main():
- print("Generating DBeaver data-sources.json from sdm status --json...")
-
- output_file = os.path.expanduser('~/.local/share/DBeaverData/workspace6/Stuzo/.dbeaver/data-sources.json')
-
- # Load existing config to preserve non-SDM connections
- existing_config = load_existing_config(output_file)
- existing_count = len(existing_config.get("connections", {}))
- print(f"📁 Found {existing_count} existing connections")
-
- sdm_data = run_sdm_status()
- if not sdm_data:
- return
-
- connections = parse_postgres_connections(sdm_data)
- print(f"🔍 Found {len(connections)} PostgreSQL connections from SDM")
-
- folders = group_connections(connections)
-
- for folder_name, conns in folders.items():
- print(f" {folder_name}: {len(conns)} connections")
-
- dbeaver_config = generate_dbeaver_config(connections, existing_config)
-
- final_count = len(dbeaver_config["connections"])
- preserved_count = final_count - len(connections)
- print(f"💾 Preserved {preserved_count} non-SDM connections")
- print(f"📊 Total connections: {final_count} ({preserved_count} preserved + {len(connections)} SDM)")
-
- try:
- with open(output_file, 'w') as f:
- json.dump(dbeaver_config, f, indent='\t')
- print(f"✅ Successfully generated {output_file}")
- print("📝 Note: You may need to restart DBeaver to see the new connections")
- except Exception as e:
- print(f"❌ Error writing file: {e}")
- if __name__ == '__main__':
- main()
|