403Webshell
Server IP : 103.119.228.120  /  Your IP : 3.138.170.67
Web Server : Apache
System : Linux v8.techscape8.com 3.10.0-1160.119.1.el7.tuxcare.els2.x86_64 #1 SMP Mon Jul 15 12:09:18 UTC 2024 x86_64
User : nobody ( 99)
PHP Version : 5.6.40
Disable Function : shell_exec,symlink,system,exec,proc_get_status,proc_nice,proc_terminate,define_syslog_variables,syslog,openlog,closelog,escapeshellcmd,passthru,ocinum cols,ini_alter,leak,listen,chgrp,apache_note,apache_setenv,debugger_on,debugger_off,ftp_exec,dl,dll,myshellexec,proc_open,socket_bind,proc_close,escapeshellarg,parse_ini_filepopen,fpassthru,exec,passthru,escapeshellarg,escapeshellcmd,proc_close,proc_open,ini_alter,popen,show_source,proc_nice,proc_terminate,proc_get_status,proc_close,pfsockopen,leak,apache_child_terminate,posix_kill,posix_mkfifo,posix_setpgid,posix_setsid,posix_setuid,dl,symlink,shell_exec,system,dl,passthru,escapeshellarg,escapeshellcmd,myshellexec,c99_buff_prepare,c99_sess_put,fpassthru,getdisfunc,fx29exec,fx29exec2,is_windows,disp_freespace,fx29sh_getupdate,fx29_buff_prepare,fx29_sess_put,fx29shexit,fx29fsearch,fx29ftpbrutecheck,fx29sh_tools,fx29sh_about,milw0rm,imagez,sh_name,myshellexec,checkproxyhost,dosyayicek,c99_buff_prepare,c99_sess_put,c99getsource,c99sh_getupdate,c99fsearch,c99shexit,view_perms,posix_getpwuid,posix_getgrgid,posix_kill,parse_perms,parsesort,view_perms_color,set_encoder_input,ls_setcheckboxall,ls_reverse_all,rsg_read,rsg_glob,selfURL,dispsecinfo,unix2DosTime,addFile,system,get_users,view_size,DirFiles,DirFilesWide,DirPrintHTMLHeaders,GetFilesTotal,GetTitles,GetTimeTotal,GetMatchesCount,GetFileMatchesCount,GetResultFiles,fs_copy_dir,fs_copy_obj,fs_move_dir,fs_move_obj,fs_rmdir,SearchText,getmicrotime
MySQL : ON |  cURL : ON |  WGET : ON |  Perl : ON |  Python : ON |  Sudo : ON |  Pkexec : ON
Directory :  /lib/mysqlsh/plugins/debug/

Upload File :
current_dir [ Writeable] document_root [ Writeable]

 

Command :


[ Back ]     

Current File : /lib/mysqlsh/plugins/debug/collect_diagnostics.py
# Copyright (c) 2021, 2024, Oracle and/or its affiliates.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0,
# as published by the Free Software Foundation.
#
# This program is designed to work with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms,
# as designated in a particular file or component or in included license
# documentation.  The authors of MySQL hereby grant you an additional
# permission to link the program and your derivative works with the
# separately licensed software that they have either included with
# the program or referenced in the documentation.
#
# This program is distributed in the hope that it will be useful,  but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See
# the GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA

from re import X
from typing import List, Tuple, Optional
from mysqlsh import globals
from mysqlsh import mysql, Error
import socket
import zipfile
import yaml
import os
import time
import sys

from debug.host_info import collect_ping_stats, collect_host_info
from debug.sql_collector import make_zipinfo, InstanceSession, collect_diagnostics, collect_diagnostics_once, DiagnosticsSession
from debug.sql_collector import collect_innodb_cluster_accounts, collect_cluster_metadata, collect_error_log_sql, collect_schema_stats, collect_slow_queries, get_topology_members, collect_table_info, explain_heatwave_query, explain_query
from debug.host_info import ShellExecutor
from debug.sql_collector import SQLExecutor


def repr_yaml_text(self, tag, value, style=None):
    if style is None:
        if "\n" in value:
            style = '|'
        else:
            style = self.default_style

    node = yaml.representer.ScalarNode(tag, value, style=style)
    if self.alias_key is not None:
        self.represented_objects[self.alias_key] = node
    return node


yaml.representer.BaseRepresenter.represent_scalar = repr_yaml_text


def copy_local_file(zf: zipfile.ZipFile, path: str, source_path: str):
    with zf.open(make_zipinfo(path), "w") as f:
        try:
            with open(source_path, "rb") as inf:
                while True:
                    data = inf.read(1024*1024)
                    if not data:
                        break
                    f.write(data)
        except:
            import traceback
            print(f"WARNING: Could not copy '{source_path}'\n")
            f.write(f"Error copying file {source_path}\n".encode("utf-8"))
            f.write(traceback.format_exc().encode("utf-8"))


def collect_error_log(zf: zipfile.ZipFile, path: str, *, local_target: bool, session: InstanceSession, ignore_errors: bool):
    if local_target:
        sys_datadir, sys_log_error = session.run_sql(
            "select @@datadir, @@log_error").fetch_one()

        if os.path.isabs(sys_log_error):
            log_path = sys_log_error
        else:
            log_path = os.path.join(sys_datadir, sys_log_error)
        print(f" - Copying MySQL error log file ({log_path})")
        copy_local_file(zf, path, log_path)
    else:
        if not collect_error_log_sql(zf, path, session, ignore_errors=ignore_errors):
            print(
                f"MySQL error logs could not be collected for {session}, please include error.log files if reporting bugs or seeking assistance")
            return False
    return True


def collect_member_info(zf: zipfile.ZipFile, prefix: str, session: InstanceSession, slow_queries: bool,
                        ignore_errors: bool, benchmark: bool = False, local_target: bool = False):

    with zf.open(make_zipinfo(f"{prefix}uri"), "w") as f:
        f.write(f"{session.uri}\n".encode("utf-8"))

    utc_time, local_time, time_zone, sys_tz, tz_offs, hostname, port, report_host, report_port, server_uuid, server_id, version = session.run_sql(
        "select utc_timestamp(), now(), @@time_zone, @@system_time_zone, cast(TIMEDIFF(NOW(), UTC_TIMESTAMP()) as char), @@hostname, @@port, @@report_host, @@report_port, @@server_uuid, @@server_id, concat(@@version_comment, ' ', @@version)").fetch_one()

    try:
        sys_version = session.run_sql(
            "select sys_version from sys.version").fetch_one()[0]
    except:
        sys_version = "N/A"

    bm_time = None
    loop_count = 50000000
    # -- should take less than 20 seconds
    if benchmark:
        print("Executing BENCHMARK()...")
        # speed up tests
        if sys.executable.endswith("mysqlshrec"):
            loop_count = 100
        bm_time = session.run_sql(
            f"SELECT BENCHMARK({loop_count},(1234*5678/37485-1298+8596^2))").get_execution_time()

    with zf.open(make_zipinfo(f"{prefix}instance"), "w") as f:
        f.write(
            f"Hostname: {hostname} (report_host={report_host})\n".encode("utf-8"))
        f.write(f"Port: {port} (report_port={report_port})\n".encode("utf-8"))
        f.write(f"Server UUID: {server_uuid}\n".encode("utf-8"))
        f.write(f"Server ID: {server_id}\n".encode("utf-8"))
        f.write(f"Connection Endpoint: {session.uri}\n".encode("utf-8"))
        f.write(f"Version: {version}\n".encode("utf-8"))
        f.write(f"UTC Time: {utc_time}\n".encode("utf-8"))
        f.write(f"Local Time: {local_time}\n".encode("utf-8"))
        f.write(f"Time Zone: {time_zone}\n".encode("utf-8"))
        f.write(f"System Time Zone: {sys_tz}\n".encode("utf-8"))
        f.write(f"Time Zone Offset: {tz_offs}\n".encode("utf-8"))
        f.write(f"SYS Schema Version: {sys_version}\n".encode("utf-8"))
        f.write(f"PFS Enabled: {session.has_pfs}\n".encode("utf-8"))
        f.write(
            f"Has InnoDB Cluster: {session.has_innodbcluster}\n".encode("utf-8"))
        f.write(f"Has NDB: {session.has_ndb}\n".encode("utf-8"))
        f.write(f"Has Rapid: {session.has_rapid}\n".encode("utf-8"))
        if benchmark:
            f.write(
                f"BENCHMARK({loop_count},(1234*5678/37485-1298+8596^2)): {bm_time}\n".encode("utf-8"))

    collect_error_log(zf, f"{prefix}error_log",
                      local_target=local_target,
                      session=session, ignore_errors=ignore_errors)

    if slow_queries:
        collect_slow_queries(zf, f"{prefix}", session,
                             ignore_errors=ignore_errors)


def normalize(data):
    t = getattr(type(data), "__name__", None)
    if t == "Dict":
        data = dict(data)
    elif t == "List":
        data = list(data)
    elif t == "Object":
        shclass = getattr(data, "__mysqlsh_classname__", None)
        if shclass == "Options":
            out = {}
            for k in dir(data):
                if not callable(data[k]):
                    out[k] = normalize(data[k])
            return out

    if isinstance(data, dict):
        out = {}
        for k, v in data.items():
            out[k] = normalize(v)
        return out
    elif isinstance(data, list):
        out = []
        for v in data:
            out.append(normalize(v))
        return out
    else:
        return data


def collect_basic_info(zf: zipfile.ZipFile, prefix: str, session: InstanceSession, info: dict = {}, shell_logs: bool = True):
    shell = globals.shell

    shell_info = dict(info)
    shell_info["version"] = shell.version
    shell_info["options"] = normalize(shell.options)
    shell_info["destination"] = session.uri
    shell_info["hostname"] = socket.gethostname()

    with zf.open(make_zipinfo(f"{prefix}shell_info.yaml"), "w") as f:
        f.write(yaml.dump(shell_info).encode("utf-8"))

    if shell_logs:
        print("Copying shell log file...")
        if shell.options["logFile"] and os.path.exists(shell.options["logFile"]):
            copy_local_file(zf, f"{prefix}mysqlsh.log",
                            shell.options["logFile"])


def collect_member(zf: zipfile.ZipFile, prefix: str,
                   session: InstanceSession,
                   innodb_mutex: bool = False,
                   slow_queries: bool = False,
                   ignore_errors: bool = False,
                   local_target: bool = False,
                   custom_sql: List[str] = []):
    vars = []
    for r in iter(session.run_sql("show global variables").fetch_one_object, None):
        vars.append(r)

    collect_member_info(
        zf, prefix, session, slow_queries=slow_queries,
        ignore_errors=ignore_errors,
        local_target=local_target)

    collect_diagnostics_once(zf, prefix, session,
                             innodb_mutex=innodb_mutex,
                             custom_sql=custom_sql)
    return vars


def collect_topology_diagnostics(zf: zipfile.ZipFile, prefix: str, session: InstanceSession, creds,
                                 innodb_mutex: bool = False,
                                 slow_queries: bool = False,
                                 ignore_errors: bool = False,
                                 local_instance_id: int = -1,
                                 custom_sql: List[str] = []):
    instance_data = []
    members = get_topology_members(session)
    for instance_id, endpoint in members:
        data = {
            "instance_id": instance_id,
            "endpoint": endpoint
        }
        print(f"Collecting information from {endpoint}...")
        try:
            dest = dict(creds)
            dest["host"], dest["port"] = endpoint.split(":")

            session = InstanceSession(mysql.get_session(dest))
        except Error as e:
            print(f"Could not connect to {endpoint}: {e}")
            with zf.open(make_zipinfo(f"{prefix}{instance_id}.connect_error.txt"), "w") as f:
                f.write(f"# {endpoint}\n".encode("utf-8"))
                f.write((str(e)+"\n").encode("utf-8"))
            if e.code == mysql.ErrorCode.ER_ACCESS_DENIED_ERROR:
                raise
            continue

        data["global_variables"] = collect_member(
            zf, f"{prefix}{instance_id}.", session,
            innodb_mutex=innodb_mutex, slow_queries=slow_queries,
            ignore_errors=ignore_errors,
            local_target=instance_id == local_instance_id,
            custom_sql=custom_sql)
        print()

        instance_data.append(data)

    return instance_data


def dump_cluster_status(zf, prefix: str, cluster_type: Optional[str]):

    if cluster_type in (None, "gr", "cs"):
        try:
            status = globals.dba.get_cluster().status({"extended": 2})
            with zf.open(make_zipinfo(f"{prefix}cluster_status.yaml"), "w") as f:
                f.write(yaml.dump(eval(repr(status))).encode("utf-8"))
        except Exception as e:
            with zf.open(make_zipinfo(f"{prefix}cluster_status.error"), "w") as f:
                f.write(b"cluster.status({'extended':2})\n")
                f.write(f"{e}\n".encode("utf-8"))
    if cluster_type in (None, "cs"):
        try:
            status = globals.dba.get_cluster_set().status({"extended": 2})
            with zf.open(make_zipinfo(f"{prefix}cluster_set_status.yaml"), "w") as f:
                f.write(yaml.dump(eval(repr(status))).encode("utf-8"))
        except Exception as e:
            with zf.open(make_zipinfo(f"{prefix}cluster_set_status.error"), "w") as f:
                f.write(b"cluster_set.status({'extended':2})\n")
                f.write(f"{e}\n".encode("utf-8"))
    if cluster_type in (None, "ar"):
        try:
            status = globals.dba.get_replica_set().status({"extended": 2})
            with zf.open(make_zipinfo(f"{prefix}replica_set_status.yaml"), "w") as f:
                f.write(yaml.dump(eval(repr(status))).encode("utf-8"))
        except Exception as e:
            with zf.open(make_zipinfo(f"{prefix}replica_set_status.error"), "w") as f:
                f.write(b"replica_set.status({'extended':2})\n")
                f.write(f"{e}\n".encode("utf-8"))


def default_filename():
    return f"mysql-diagnostics-{time.strftime('%Y%m%d-%H%M%S')}.zip"


def diff_multi_tables(topology_info, get_data, get_key, get_value):
    fields = {}
    different_fields = {}
    for i, instance in enumerate(topology_info):
        for row in get_data(instance):
            key = get_key(row)
            value = get_value(row)
            if key not in fields:
                fields[key] = [None] * i + [value]
                if i > 0:
                    different_fields[key] = True
            else:
                if key not in different_fields:
                    if fields[key][-1] != value:
                        different_fields[key] = True
                fields[key].append(value)
    diff = []
    for key in different_fields.keys():
        diff.append((key, fields[key]))

    return diff


def dump_diff(f, key_label, instance_labels, diff, header):
    def get_column_widths(data):
        column_widths = [0] * len(instance_labels)
        for key, values in data:
            for i, v in enumerate(values):
                column_widths[i] = max(column_widths[i], len(v) if v else 0)
        return column_widths

    if header:
        f.write(f"# {header}\n".encode("utf-8"))

    # find widths of each column
    column_widths = [len(key_label)]
    for k, v in diff:
        column_widths[0] = max(column_widths[0], len(k))
    column_widths += get_column_widths(diff)

    line = [key_label.ljust(column_widths[0])]
    for i, label in enumerate(instance_labels):
        line.append(label.ljust(column_widths[1+i]))
    h = (" | ".join(line) + "\n")
    f.write(h.encode("utf-8"))
    f.write(("="*len(h)+"\n").encode("utf-8"))
    for k, v in diff:
        line = [k.ljust(column_widths[0])]
        for i, value in enumerate(v):
            line.append(value.ljust(column_widths[1+i]))
        f.write((" | ".join(line) + "\n").encode("utf-8"))


def analyze_topology_data(zf: zipfile.ZipFile, prefix, topology_info):
    diff = diff_multi_tables(topology_info,
                             lambda i: i["global_variables"],
                             lambda row: row["Variable_name"],
                             lambda row: row["Value"])
    instance_names = []
    for i in topology_info:
        instance_names.append(i["endpoint"])

    with zf.open(make_zipinfo(f"{prefix}diff.global_variables.txt"), "w") as f:
        if diff:
            dump_diff(
                f, "Variable", instance_names, diff, f"{len(diff)} differing global variables between instances")


def process_path(path: str) -> Tuple[str, str]:
    if not path:
        raise Error("'path' cannot be an empty string")

    if not path.lower().endswith(".zip"):
        if path.endswith("/") or (sys.platform == "win32" and path.endswith("\\")):
            path = os.path.join(path, default_filename())
        else:
            path += ".zip"

    prefix = os.path.basename(path.replace("\\", "/"))[:-4] + "/"

    if os.path.exists(path):
        raise Error(path+" already exists")

    return path, prefix


def do_collect_diagnostics(session_, path, orig_args,
                           innodbMutex=False,
                           allMembers=False,
                           schemaStats=False,
                           slowQueries=False,
                           ignoreErrors=False,
                           customSql: List[str] = [],
                           customShell: List[str] = []):
    path, prefix = process_path(path)

    session = InstanceSession(session_)
    if session.version < 50700:
        raise Error("MySQL 5.7 or newer required")

    if slowQueries:
        row = session.run_sql(
            "select @@slow_query_log, @@log_output").fetch_one()
        if not row[0] or row[1] != "TABLE":
            raise Error(
                "slowQueries option requires slow_query_log to be enabled and log_output to be set to TABLE")

    shell = globals.shell
    my_instance_id = session.instance_id

    creds = None
    if allMembers and my_instance_id:
        creds = shell.parse_uri(session.uri)
        creds["password"] = shell.prompt(
            f"Password for {creds['user']}: ", {"type": "password"})

    target = globals.shell.parse_uri(session.uri)
    local_target = True if "host" not in target else target["host"] == "localhost"
    if customShell and not local_target:
        raise Error(
            "Option 'customShell' is only allowed when connected to localhost")

    shell_exe = ShellExecutor(customShell, allow_phases=False)

    print(f"Collecting diagnostics information from {session.uri}...")
    try:
        with zipfile.ZipFile(path, mode="w") as zf:
            os.chmod(path, 0o600)
            if local_target:
                shell_exe.execute(zf, prefix)

            cluster_type = None
            if session.has_innodbcluster:
                print("InnoDB Cluster detected")
                cluster_type = collect_cluster_metadata(
                    zf, prefix, session, ignore_errors=ignoreErrors)
            collect_basic_info(zf, prefix, session,
                               info={"command": "collectDiagnostics",
                                     "commandOptions": normalize(orig_args)})
            collect_schema_stats(zf, prefix, session, full=schemaStats,
                                 ignore_errors=ignoreErrors)

            topology_data = None
            collected = False
            if session.has_innodbcluster and my_instance_id:
                print("Gathering grants for mysql_innodb_% accounts...")
                collect_innodb_cluster_accounts(zf, prefix, session,
                                                ignore_errors=ignoreErrors)

                print("Gathering cluster status...")
                dump_cluster_status(zf, prefix, cluster_type)

                if allMembers and my_instance_id:
                    collected = True
                    topology_data = collect_topology_diagnostics(
                        zf, prefix, session, creds, innodb_mutex=innodbMutex,
                        slow_queries=slowQueries, ignore_errors=ignoreErrors,
                        local_instance_id=my_instance_id if local_target else -1)

                    analyze_topology_data(zf, prefix, topology_data)

            if not collected:
                collect_member(zf, f"{prefix}{my_instance_id}.",
                               session, slow_queries=slowQueries,
                               innodb_mutex=innodbMutex,
                               ignore_errors=ignoreErrors,
                               local_target=local_target,
                               custom_sql=customSql)

            # collect local host info if we're connected to localhost
            target = shell.parse_uri(session.uri)
            if topology_data:
                if local_target:
                    print("Connected to local server, collecting ping stats...")
                    collect_ping_stats(zf, prefix, topology_data)
                else:
                    print(
                        "Connected to remote server, ping stats not be collected.")

            # collect system info
            if local_target:
                collect_host_info(zf, prefix)
    except:
        if os.path.exists(path):
            os.remove(path)
        print()
        print("An error occurred during data collection. Partial output deleted.")
        raise

    print()
    print(f"Diagnostics information was written to {path}")
    if not session.has_pfs:
        print("WARNING: performance_schema is disabled, collected a limited amount of information")
    if allMembers and not session.has_innodbcluster:
        print("NOTE: allMembers enabled, but InnoDB Cluster metadata not found")


def collect_common_high_load_data(zf: zipfile.ZipFile, prefix: str, session: InstanceSession, info: dict, local_target: bool):
    # collect general info
    collect_basic_info(zf, prefix, session,
                       info=info,
                       shell_logs=False)

    collect_schema_stats(zf, prefix, session,
                         full=True, ignore_errors=False)

    collect_member_info(zf, prefix, session,
                        slow_queries=True,
                        ignore_errors=False,
                        local_target=local_target,
                        benchmark=True)

    # collect system info
    if local_target:
        collect_host_info(zf, prefix)


def do_collect_high_load_diagnostics(session_, path, orig_args,
                                     innodbMutex=False,
                                     iterations=2,
                                     delay=5*60,
                                     pfsInstrumentation="current",
                                     customSql: List[str] = [],
                                     customShell: List[str] = []):
    path, prefix = process_path(path)

    if iterations < 1:
        raise Error(f"'iterations' must be > 0 (is {iterations})")
    if delay < 1:
        raise Error(f"'delay' must be > 0 (is {delay})")
    if pfsInstrumentation not in ("current", "medium", "full"):
        raise Error(
            "'pfsInstrumentation' must be one of current, medium, full")

    session = InstanceSession(session_)

    if session.version < 50700:
        raise Error("MySQL 5.7 or newer required")

    target = globals.shell.parse_uri(session.uri)
    local_target = True if "host" not in target else target["host"] == "localhost"
    if customShell and not local_target:
        raise Error(
            "Option 'customShell' is only allowed when connected to localhost")

    print(f"Collecting diagnostics information from {session.uri}...")
    try:
        with zipfile.ZipFile(path, mode="w") as zf:
            os.chmod(path, 0o600)
            collect_common_high_load_data(zf, prefix, session,
                                          {"command": "collectHighLoadDiagnostics",
                                           "commandOptions": normalize(orig_args)},
                                          local_target)

            assert not customShell or local_target

            print()
            collect_diagnostics(zf, prefix, session,
                                iterations, delay, pfsInstrumentation,
                                innodb_mutex=innodbMutex,
                                custom_sql=customSql,
                                custom_shell=customShell)
    except:
        if os.path.exists(path):
            os.remove(path)
        print()
        print("An error occurred during data collection. Partial output deleted.")
        raise

    print()
    if not session.has_pfs:
        print("WARNING: performance_schema is disabled, collected a limited amount of information")
    if not local_target:
        print("NOTE: Target server is not at localhost, host information was not collected")
    if not session.has_pfs:
        print("NOTE: performance_schema is disabled, only a limited set of metrics and information could be collected")
    print(f"Server load diagnostics information was written to {path}")


def extract_referenced_tables(session: InstanceSession, query: str) -> List[Tuple[str, str]]:
    default_schema = session.run_sql("select schema()").fetch_one()[0]

    def find_table_refs(ast):
        def extract_table_ref(node):
            if "rule" in node and node["rule"] == "pureIdentifier":
                if node["children"][0]["symbol"] == "BACK_TICK_QUOTED_ID":
                    return [mysql.unquote_identifier(node["children"][0]["text"])]
                else:
                    return [node["children"][0]["text"]]
            elif "children" in node:
                ref = []
                for n in node["children"]:
                    ref += extract_table_ref(n)
                return ref
            return []

        refs = []
        if "rule" in ast and ast["rule"] == "tableRef":
            table_ref = extract_table_ref(ast)
            assert table_ref
            if len(table_ref) == 1:
                table_ref = default_schema, table_ref[0]
            refs.append(table_ref)
        elif "children" in ast:
            for child in ast["children"]:
                refs += find_table_refs(child)
        return refs

    ast = mysql.parse_statement_ast(query)

    return find_table_refs(ast)


def execute_profiled_query(zf: zipfile.ZipFile, prefix: str, helper_session, query: str):
    print(f"Executing query: {query}")
    r = helper_session.run_sql(query)
    print(f"Query finished in {r.get_execution_time()}")

    fetch_start = time.time()
    row_count = len(r.fetch_all())
    fetch_end = time.time()
    print(f"Results fetched in {fetch_end-fetch_start:.4f} sec")

    query_info = {
        "Execution Time": r.get_execution_time(),
        "Result Fetch Time": (fetch_end - fetch_start),
        "Query": query,
        "Warnings": normalize(r.get_warnings()),
        "Affected Rows": r.get_affected_row_count(),
        "Result Rows": row_count
    }
    if hasattr(r, "get_info"):
        query_info["Info"] = r.get_info()

    return query_info


def do_collect_slow_query_diagnostics(session_, path: str, query: str, orig_args,
                                      innodbMutex=False,
                                      delay=15,
                                      pfsInstrumentation="current",
                                      customSql: List[str] = [],
                                      customShell: List[str] = []):
    if not query:
        raise Error("'query' must contain the query to be analyzed")
    path, prefix = process_path(path)

    if delay < 1:
        raise Error(f"'delay' must be > 0 (is {delay})")
    if pfsInstrumentation not in ("current", "medium", "full"):
        raise Error(
            "'pfsInstrumentation' must be one of current, medium, full")

    session = InstanceSession(session_)
    if session.version < 50700:
        raise Error("MySQL 5.7 or newer required")

    target = globals.shell.parse_uri(session.uri)
    local_target = True if "host" not in target else target["host"] == "localhost"
    if customShell and not local_target:
        raise Error(
            "Option 'customShell' is only allowed when connected to localhost")

    referenced_tables = extract_referenced_tables(session, query)

    helper_session = globals.shell.open_session()

    print(f"Collecting diagnostics information from {session.uri}...")
    try:
        with zipfile.ZipFile(path, mode="w") as zf:
            os.chmod(path, 0o600)
            shell = ShellExecutor(customShell, allow_phases=True)
            custom = SQLExecutor(session, customSql, allow_phases=True)

            shell.execute_before(zf, prefix)
            custom.execute_before(zf, prefix)

            for s, t in referenced_tables:
                print(
                    f"Collecting information for referenced table `{s}`.`{t}`")
                collect_table_info(zf, prefix, session, s, t)

            collect_common_high_load_data(zf, prefix, session,
                                          {"command": "collectSlowQueryDiagnostics",
                                           "commandOptions": normalize(orig_args)},
                                          local_target)

            print("Collecting EXPLAIN")
            explain_query(zf, session, query, prefix)

            if session.has_rapid:
                print("Collecting EXPLAIN with Heatwave")
                explain_heatwave_query(zf, session, query, prefix)

            diag = DiagnosticsSession(session, innodb_mutex=innodbMutex)
            diag.collect_configs_and_state(zf, prefix)
            try:
                diag.start(zf, f"{prefix}diagnostics",
                           pfs_instrumentation=pfsInstrumentation)

                print("Starting background diagnostics collector...")

                def on_iterate(i):
                    shell.execute_during(zf, prefix, i)
                    custom.execute_during(zf, prefix, i)

                diag.start_background(delay, zf, f"{prefix}diagnostics",
                                      on_iterate)

                try:
                    query_info = execute_profiled_query(
                        zf, prefix, helper_session, query)
                except:
                    diag.stop_background()
                    raise
            except:
                diag.cleanup(zf, prefix)
                raise

            diag.stop_background()

            diag.finish(zf, f"{prefix}diagnostics")

            with zf.open(f"{prefix}query-info.yaml", "w") as f:
                f.write(yaml.dump(query_info).encode("utf-8"))
                f.write(b"\n")

            shell.execute_after(zf, prefix)
            custom.execute_after(zf, prefix)
    except:
        helper_session.close()
        if os.path.exists(path):
            os.remove(path)
        print()
        print("An error occurred during query profiling. Partial output deleted.")
        raise

    helper_session.close()

    print()
    if not session.has_pfs:
        print("WARNING: performance_schema is disabled, collected a limited amount of information")
    if not referenced_tables:
        print("NOTE: could not extract list of referenced tables from query")
    if not local_target:
        print("NOTE: Target server is not at localhost, host information was not collected")
    print(f"Server and query diagnostics information was written to {path}")

Youez - 2016 - github.com/yon3zu
LinuXploit