403Webshell
Server IP : 103.119.228.120  /  Your IP : 18.224.59.107
Web Server : Apache
System : Linux v8.techscape8.com 3.10.0-1160.119.1.el7.tuxcare.els2.x86_64 #1 SMP Mon Jul 15 12:09:18 UTC 2024 x86_64
User : nobody ( 99)
PHP Version : 5.6.40
Disable Function : shell_exec,symlink,system,exec,proc_get_status,proc_nice,proc_terminate,define_syslog_variables,syslog,openlog,closelog,escapeshellcmd,passthru,ocinum cols,ini_alter,leak,listen,chgrp,apache_note,apache_setenv,debugger_on,debugger_off,ftp_exec,dl,dll,myshellexec,proc_open,socket_bind,proc_close,escapeshellarg,parse_ini_filepopen,fpassthru,exec,passthru,escapeshellarg,escapeshellcmd,proc_close,proc_open,ini_alter,popen,show_source,proc_nice,proc_terminate,proc_get_status,proc_close,pfsockopen,leak,apache_child_terminate,posix_kill,posix_mkfifo,posix_setpgid,posix_setsid,posix_setuid,dl,symlink,shell_exec,system,dl,passthru,escapeshellarg,escapeshellcmd,myshellexec,c99_buff_prepare,c99_sess_put,fpassthru,getdisfunc,fx29exec,fx29exec2,is_windows,disp_freespace,fx29sh_getupdate,fx29_buff_prepare,fx29_sess_put,fx29shexit,fx29fsearch,fx29ftpbrutecheck,fx29sh_tools,fx29sh_about,milw0rm,imagez,sh_name,myshellexec,checkproxyhost,dosyayicek,c99_buff_prepare,c99_sess_put,c99getsource,c99sh_getupdate,c99fsearch,c99shexit,view_perms,posix_getpwuid,posix_getgrgid,posix_kill,parse_perms,parsesort,view_perms_color,set_encoder_input,ls_setcheckboxall,ls_reverse_all,rsg_read,rsg_glob,selfURL,dispsecinfo,unix2DosTime,addFile,system,get_users,view_size,DirFiles,DirFilesWide,DirPrintHTMLHeaders,GetFilesTotal,GetTitles,GetTimeTotal,GetMatchesCount,GetFileMatchesCount,GetResultFiles,fs_copy_dir,fs_copy_obj,fs_move_dir,fs_move_obj,fs_rmdir,SearchText,getmicrotime
MySQL : ON |  cURL : ON |  WGET : ON |  Perl : ON |  Python : ON |  Sudo : ON |  Pkexec : ON
Directory :  /usr/local/ssl/local/ssl/local/ssl/local/ssl/local/ssl/lib/mysqlsh/plugins/debug/

Upload File :
current_dir [ Writeable] document_root [ Writeable]

 

Command :


[ Back ]     

Current File : /usr/local/ssl/local/ssl/local/ssl/local/ssl/local/ssl/lib/mysqlsh/plugins/debug/sql_collector.py
# Copyright (c) 2021, 2024, Oracle and/or its affiliates.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0,
# as published by the Free Software Foundation.
#
# This program is designed to work with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms,
# as designated in a particular file or component or in included license
# documentation.  The authors of MySQL hereby grant you an additional
# permission to link the program and your derivative works with the
# separately licensed software that they have either included with
# the program or referenced in the documentation.
#
# This program is distributed in the hope that it will be useful,  but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See
# the GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA

from multiprocessing import Condition
from threading import Thread
from mysqlsh import mysql, Error
from typing import Callable, List, Optional, Tuple
import yaml
import json
import datetime
import zipfile

from .host_info import ShellExecutor, split_phases


def make_zipinfo(path: str) -> zipfile.ZipInfo:
    return zipfile.ZipInfo(path,
                           date_time=(tuple(
                               datetime.datetime.now().timetuple())))


def sanitize(s):
    return s.replace(' ', '_').replace(':',
                                       '_').replace('\\',
                                                    '_').replace('/', '_')


def format_bytes(n):
    if n is None:
        return None
    if n < 1024:
        return f"{int(n)} bytes"
    elif n < 1024 * 1024:
        return f"{n/1024:0.2f} KiB"
    elif n < 1024 * 1024 * 1024:
        return f"{n/(1024*1024):0.2f} MiB"
    elif n < 1024 * 1024 * 1024 * 1024:
        return f"{n/(1024*1024*1024):0.2f} GiB"
    else:
        return f"{n/(1024*1024*1024*1024):0.2f} TiB"


def format_pico_time(picoseconds):
    if picoseconds is None:
        return None
    elif picoseconds >= 604800000000000000:
        return f'{picoseconds / 604800000000000000:0.2f} w'
    elif picoseconds >= 86400000000000000:
        return f'{picoseconds / 86400000000000000:0.2f} d'
    elif picoseconds >= 3600000000000000:
        return f'{picoseconds / 3600000000000000:0.2f} h'
    elif picoseconds >= 60000000000000:
        return f'{picoseconds / 60000000000000:0.2f} m'
    elif picoseconds >= 1000000000000:
        return f'{picoseconds / 1000000000000:0.2f} s'
    elif picoseconds >= 1000000000:
        return f'{picoseconds / 1000000000:0.2f} ms'
    elif picoseconds >= 1000000:
        return f'{picoseconds / 1000000:0.2f} us'
    elif picoseconds >= 1000:
        return f'{picoseconds / 1000:0.2f} ns'
    else:
        return f'{picoseconds} ps'


class InstanceSession:

    def __init__(self, session):
        self.session = session

        self.has_pfs, version = self.run_sql(
            "select @@performance_schema <> 'OFF', @@version").fetch_one()
        if "-" in version:
            version = version.split("-")[0]
        a, b, c = version.split(".")
        self.version = int(a) * 10000 + int(b) * 100 + int(c)

        self.pfs_tables = [
            t[0] for t in self.run_sql(
                "show tables in performance_schema").fetch_all()
        ]
        self.is_tables = [
            t[0] for t in self.run_sql(
                "show tables in information_schema").fetch_all()
        ]
        self.sys_tables = [
            r[0] for r in self.run_sql("show tables in sys").fetch_all()
        ]
        self.mysql_tables = [
            r[0] for r in self.run_sql("show tables in mysql").fetch_all()
        ]

        supported_engines = [
            r[0] for r in self.run_sql(
                "select engine from information_schema.engines where support<>'NO'"
            ).fetch_all()
        ]

        self.has_ndb = "NDBCluster" in supported_engines
        self.has_rapid = "rpd_nodes" in self.pfs_tables

        if self.session.run_sql(
                "show schemas like 'mysql_innodb_cluster_metadata'").fetch_one(
        ):
            self.has_innodbcluster = True
            try:
                self.instance_id = self.session.run_sql(
                    "select instance_id from mysql_innodb_cluster_metadata.v2_this_instance").fetch_one()[0]
            except:
                self.instance_id = self.session.run_sql(
                    "select instance_id from mysql_innodb_cluster_metadata.instances where cast(mysql_server_uuid as binary)=cast(@@server_uuid as binary)"
                ).fetch_one()[0]
        else:
            self.has_innodbcluster = False
            self.instance_id = 0

    def __str__(self):
        return self.session.uri

    @property
    def uri(self) -> str:
        return self.session.uri

    def run_sql(self, sql: str, args: list = []):
        try:
            return self.session.run_sql(sql, args)
        except Exception as e:
            print("ERROR running query: ", sql, str(e))
            raise


def write_tsv(zf: zipfile.ZipFile, fn: str, tsv_out: list, header: str = ""):

    def write(f):
        if header:
            f.write(header.encode("utf-8"))
        for line in tsv_out:
            f.write(line.encode("utf-8"))
            f.write(b"\n")

    with zf.open(make_zipinfo(fn + ".tsv"), "w") as f:
        write(f)


def dump_query(zf: zipfile.ZipFile,
               fn: str,
               session: InstanceSession,
               query: str,
               args: List[str] = [],
               *,
               as_yaml: bool = True,
               as_tsv: bool = True,
               filter: Optional[Callable] = None,
               formatters: List[Callable] = [],
               ignore_errors: bool = True,
               include_warnings: bool = False) -> list:

    def handle_error(e):
        if ignore_errors:
            with zf.open(make_zipinfo(fn + ".error"), "w") as f:
                f.write(f"{header}\n# {e}\n".encode("utf-8"))
            return []
        print(f'ERROR: While executing "{query}": {e}')
        raise

    header = "# Query:\n" + \
        "\n".join([f"#\t{l}" for l in query.split("\n")]) + "\n"
    header += "#\n"
    header += f"# Started: {datetime.datetime.now().isoformat()}\n"
    try:
        r = session.run_sql(query, args)
        execution_time = r.get_execution_time()
    except Exception as e:
        return handle_error(e)

    raw_out = []

    yaml_out = []
    tsv_out = []

    if as_tsv:
        line = []
        for c in r.columns:
            line.append(c.column_label)
        tsv_out.append("# " + "\t".join(line))

    try:
        for row in iter(r.fetch_one, None):
            line = []
            if filter:
                row = filter(row)
                if not row:
                    continue
            entry = {}
            raw_entry = {}
            for i in range(len(row)):
                field = row[i]
                raw_entry[r.columns[i].column_label] = field
                if r.columns[i].column_label in formatters:
                    field = formatters[i](field)
                line.append(str(field) if field is not None else "NULL")
                if type(field) == str:
                    if '"' in field:
                        field = '"' + field.replace('"', '\\"') + '"'
                    entry[r.columns[i].column_label] = field
                elif type(field) in (int, str, bool, type(None), float):
                    entry[r.columns[i].column_label] = field
                else:
                    entry[r.columns[i].column_label] = str(field)
            if as_tsv:
                tsv_out.append("\t".join(line))
            yaml_out.append(entry)
            raw_out.append(raw_entry)
    except Exception as e:
        return handle_error(e)

    if include_warnings:
        warnings = session.run_sql("SHOW WARNINGS").fetch_all()
        if warnings:
            yaml_out.append({"Warnings": warnings})
            if as_tsv:
                tsv_out.append("# Warnings")
                for w in warnings:
                    tsv_out.append("\t".join([str(f) for f in w]))

    header += f"# Execution Time: {execution_time}\n#\n"

    if as_tsv:
        write_tsv(zf, fn, tsv_out, header=header)

    if as_yaml:
        with zf.open(make_zipinfo(fn + ".yaml"), "w") as f:
            f.write(header.encode("utf-8"))
            f.write(yaml.dump_all(yaml_out).encode("utf-8"))

    return raw_out


def dump_table(zf: zipfile.ZipFile,
               fn: str,
               session: InstanceSession,
               table,
               *,
               as_yaml=False,
               filter=None,
               ignore_errors=True):
    return dump_query(zf,
                      fn,
                      session,
                      f"select * from {table}",
                      filter=filter,
                      as_yaml=as_yaml,
                      ignore_errors=ignore_errors)


def collect_tables(zf: zipfile.ZipFile,
                   prefix: str,
                   session: InstanceSession,
                   tables: List[str],
                   *,
                   as_yaml: bool = False,
                   ignore_errors: bool = True) -> dict:
    info = {}
    for table in tables:
        print(f" - Gathering {table}...")
        info[table] = dump_table(zf,
                                 f"{prefix}{sanitize(table)}",
                                 session,
                                 table,
                                 as_yaml=as_yaml,
                                 ignore_errors=ignore_errors)
    return info


def collect_queries(zf: zipfile.ZipFile,
                    prefix: str,
                    session: InstanceSession,
                    queries: list,
                    *,
                    as_yaml=False,
                    ignore_errors: bool = True,
                    include_warnings: bool = False):
    info = {}
    for q in queries:
        if type(q) is tuple:
            if len(q) == 2:
                label, query = q
                filter = None
            else:
                label, query, filter = q
        else:
            label, query, filter = q, q, None
        print(f" - Gathering {label}...")
        try:
            info[label.replace(' ', '_')] = dump_query(
                zf,
                f"{prefix}{sanitize(label)}",
                session,
                query,
                as_yaml=as_yaml,
                ignore_errors=ignore_errors,
                include_warnings=include_warnings,
                filter=filter)
        except Error as e:
            if e.code not in (mysql.ErrorCode.ER_NO_BINARY_LOGGING, ):
                raise
    return info


def collect_queries_single_file(zf: zipfile.ZipFile, fn: str,
                                session: InstanceSession, queries: list):
    with zf.open(make_zipinfo(fn), "w") as f:
        for query in queries:
            print(f" - Executing {query}...")

            header = "# Query:\n" + \
                "\n".join([f"#\t{l}" for l in query.split("\n")]) + "\n"
            f.write(header.encode("utf-8"))
            try:
                res = session.run_sql(query)
                f.write(
                    f"# Execution Time: {res.get_execution_time()}\n\n".encode(
                        "utf-8"))
                f.write(b"\n")
                f.write(
                    ("# " + "\t".join([c.column_label for c in res.columns]) +
                     "\n").encode("utf-8"))

                line = []
                for row in iter(res.fetch_one, None):
                    for i in range(len(row)):
                        field = row[i]
                        line.append(
                            str(field) if field is not None else "NULL")
                line = "\t".join(line)
                f.write(f"{line}\n".encode("utf-8"))
            except Error as e:
                f.write(f"# Error: {e}\n".encode("utf-8"))
                print(f"ERROR: {e}")


k_sys_views_delta = [
    # TABLE_NAME, order_by, order_by_delta, where_delta, limit_rows, pk
    ('host_summary', '%{TABLE}.statement_latency DESC',
     lambda s, e: (e["statement_latency"]-(s["statement_latency"] or 0)),
     lambda s, e: e["statements"] - (s["statements"] or 0) > 0, None, 'host'),
    ('host_summary_by_file_io', '%{TABLE}.io_latency DESC',
     '(e.io_latency-IFNULL(s.io_latency, 0)) DESC',
     lambda s, e: (e["ios"] - (s["ios"] or 0)) > 0, None, 'host'),
    ('host_summary_by_file_io_type', '%{TABLE}.host, %{TABLE}.total_latency DESC',
     'e.host, (e.total_latency-IFNULL(s.total_latency, 0)) DESC',
     lambda s, e: (e["total"] - (s["total"] or 0)) > 0, None, 'host,event_name'),
    ('host_summary_by_stages', '%{TABLE}.host, %{TABLE}.total_latency DESC',
     'e.host, (e.total_latency-IFNULL(s.total_latency, 0)) DESC',
     lambda s, e: (e["total"] - (s["total"] or 0)) > 0, None, 'host,event_name'),
    ('host_summary_by_statement_latency', '%{TABLE}.total_latency DESC',
     '(e.total_latency-IFNULL(s.total_latency, 0)) DESC',
     lambda s, e: (e["total"] - (s["total"] or 0)) > 0, None, 'host'),
    ('host_summary_by_statement_type', '%{TABLE}.host, %{TABLE}.total_latency DESC',
     'e.host, (e.total_latency-IFNULL(s.total_latency, 0)) DESC',
     lambda s, e: (e["total"] - (s["total"] or 0)) > 0, None, 'host,statement'),
    ('io_by_thread_by_latency', '%{TABLE}.total_latency DESC',
     '(e.total_latency-IFNULL(s.total_latency, 0)) DESC',
     lambda s, e: (e["total"] - (s["total"] or 0)) > 0, None, 'user,thread_id,processlist_id'),
    ('io_global_by_file_by_bytes', '%{TABLE}.total DESC',
     '(e.total-IFNULL(s.total, 0)) DESC',
     lambda s, e: (e["total"] - (s["total"] or 0)) > 0, 100, 'file'),
    ('io_global_by_file_by_latency', '%{TABLE}.total_latency DESC',
     '(e.total_latency-IFNULL(s.total_latency, 0)) DESC',
     lambda s, e: (e["total"] - (s["total"] or 0)) > 0, 100, 'file'),
    ('io_global_by_wait_by_bytes', '%{TABLE}.total_requested DESC',
     '(e.total_requested-IFNULL(s.total_requested, 0)) DESC',
     lambda s, e: (e["total"] - (s["total"] or 0)) > 0, None, 'event_name'),
    ('io_global_by_wait_by_latency', '%{TABLE}.total_latency DESC',
     '(e.total_latency-IFNULL(s.total_latency, 0)) DESC',
     lambda s, e: (e["total"] - (s["total"] or 0)) > 0, None, 'event_name'),
    ('schema_index_statistics', '(%{TABLE}.select_latency+%{TABLE}.insert_latency+%{TABLE}.update_latency+%{TABLE}.delete_latency) DESC',
     '((e.select_latency+e.insert_latency+e.update_latency+e.delete_latency)-IFNULL(s.select_latency+s.insert_latency+s.update_latency+s.delete_latency, 0)) DESC',
     lambda s, e: ((e["rows_selected"]+e["insert_latency"]+e["rows_updated"]+e["rows_deleted"]) - \
                   (s["rows_selected"]+s["rows_inserted"]+s["rows_updated"]+s["rows_deleted"] or 0)) > 0,
     100, 'table_schema,table_name,index_name'),
    ('schema_table_statistics', '%{TABLE}.total_latency DESC',
     '(e.total_latency-IFNULL(s.total_latency, 0)) DESC',
     lambda s, e: (e["total_latency"]-(s["total_latency"] or 0)) > 0, 100, 'table_schema,table_name'),
    ('schema_tables_with_full_table_scans', '%{TABLE}.rows_full_scanned DESC',
     '(e.rows_full_scanned-IFNULL(s.rows_full_scanned, 0)) DESC',
     lambda s, e: (e["rows_full_scanned"]-(s["rows_full_scanned"] or 0)) > 0, 100, 'object_schema,object_name'),
    ('user_summary', '%{TABLE}.statement_latency DESC',
     '(e.statement_latency-IFNULL(s.statement_latency, 0)) DESC',
     lambda s, e: (e["statements"] - (s["statements"] or 0)) > 0, None, 'user'),
    ('user_summary_by_file_io', '%{TABLE}.io_latency DESC',
     '(e.io_latency-IFNULL(s.io_latency, 0)) DESC',
     lambda s, e: (e["ios"] - (s["ios"] or 0)) > 0, None, 'user'),
    ('user_summary_by_file_io_type', '%{TABLE}.user, %{TABLE}.latency DESC',
     'e.user, (e.latency-IFNULL(s.latency, 0)) DESC',
     lambda s, e: (e["total"] - (s["total"] or 0)) > 0, None, 'user,event_name'),
    ('user_summary_by_stages', '%{TABLE}.user, %{TABLE}.total_latency DESC',
     'e.user, (e.total_latency-IFNULL(s.total_latency, 0)) DESC',
     lambda s, e: (e["total"] - (s["total"] or 0)) > 0, None, 'user,event_name'),
    ('user_summary_by_statement_latency', '%{TABLE}.total_latency DESC',
     '(e.total_latency-IFNULL(s.total_latency, 0)) DESC',
     lambda s, e: (e["total"] - (s["total"] or 0)) > 0, None, 'user'),
    ('user_summary_by_statement_type', '%{TABLE}.user, %{TABLE}.total_latency DESC',
     'e.user, (e.total_latency-IFNULL(s.total_latency, 0)) DESC',
     lambda s, e: (e["total"] - (s["total"] or 0)) > 0, None, 'user,statement'),
    ('wait_classes_global_by_avg_latency', 'IFNULL(%{TABLE}.total_latency / NULLIF(%{TABLE}.total, 0), 0) DESC',
     'IFNULL((e.total_latency-IFNULL(s.total_latency, 0)) / NULLIF((e.total - IFNULL(s.total, 0)), 0), 0) DESC',
     lambda s, e: (e["total"] - (s["total"] or 0)) > 0, None, 'event_class'),
    ('wait_classes_global_by_latency', '%{TABLE}.total_latency DESC',
     '(e.total_latency-IFNULL(s.total_latency, 0)) DESC',
     lambda s, e: (e["total"] - (s["total"] or 0)) > 0, None, 'event_class'),
    ('waits_by_host_by_latency', '%{TABLE}.host, %{TABLE}.total_latency DESC',
     'e.host, (e.total_latency-IFNULL(s.total_latency, 0)) DESC',
     lambda s, e: (e["total"] - (s["total"] or 0)) > 0, None, 'host,event'),
    ('waits_by_user_by_latency', '%{TABLE}.user, %{TABLE}.total_latency DESC',
     'e.user, (e.total_latency-IFNULL(s.total_latency, 0)) DESC',
     lambda s, e: (e["total"] - (s["total"] or 0)) > 0, None, 'user,event'),
    ('waits_global_by_latency', '%{TABLE}.total_latency DESC',
     '(e.total_latency-IFNULL(s.total_latency, 0)) DESC',
     lambda s, e: (e["total"] - (s["total"] or 0)) > 0, None, 'events')
]


class SQLExecutor:

    def __init__(self,
                 session: InstanceSession,
                 custom_sql: List[str] = [],
                 allow_phases: bool = True):
        self.session = session
        phases = split_phases(custom_sql)
        if not allow_phases:
            if phases[1] or phases[2]:
                raise Error(
                    "Option 'customSql' may not contain before:, during: or after: prefixes"
                )

        self.custom_sql = []
        for l in phases:
            self.custom_sql.append([(f"script_{i}", sql)
                                    for i, sql in enumerate(l)])

    def execute(self, zf: zipfile.ZipFile, prefix: str):
        l = self.custom_sql[0]
        if l:
            print(f"Executing custom sql scripts")
            collect_queries(zf,
                            f"{prefix}custom_sql-",
                            self.session,
                            l,
                            ignore_errors=False)

    def execute_before(self, zf: zipfile.ZipFile, prefix: str):
        l = self.custom_sql[0]
        if l:
            print(f"Executing custom 'before' sql scripts")
            collect_queries(zf,
                            f"{prefix}custom_sql-before-",
                            self.session,
                            l,
                            ignore_errors=False)

    def execute_during(self, zf: zipfile.ZipFile, prefix: str, iteration: int):
        l = self.custom_sql[1]
        if l:
            print(f"Executing custom 'during' iteration sql scripts")
            collect_queries(zf,
                            f"{prefix}custom_sql-iteration{iteration}-",
                            self.session,
                            l,
                            ignore_errors=False)

    def execute_after(self, zf: zipfile.ZipFile, prefix: str):
        l = self.custom_sql[2]
        if l:
            print(f"Executing custom 'after' sql scripts")
            collect_queries(zf,
                            f"{prefix}custom_sql-after-",
                            self.session,
                            l,
                            ignore_errors=False)


class DiagnosticsSession:

    def __init__(self, session: InstanceSession, innodb_mutex: bool):
        self.session = session
        self.pfs_instrumentation = None
        self.metrics = {}
        self.metrics_info = {}
        self.status_start = {}
        self.status_end = {}

        self.pfs_instrumentation_changed = False
        self.has_performance_analyzer = False

        self.innodb_mutex = innodb_mutex

        if self.session.has_pfs:
            self.pfs_events_wait_history_long = self.session.run_sql(
                "select sys.ps_is_consumer_enabled('events_waits_history_long') = 'YES'"
            ).fetch_one()[0]
            self.pfs_memory_instrumented = self.session.run_sql(
                "select EXISTS(SELECT 1 FROM performance_schema.setup_instruments WHERE NAME LIKE 'memory/%' AND ENABLED = 'YES')"
            ).fetch_one()[0]
        else:
            self.pfs_events_wait_history_long = False
            self.pfs_memory_instrumented = False

        if self.session.has_ndb:
            self.ndbinfo_tables = [
                "ndbinfo." + t[0] for t in self.session.run_sql(
                    "show tables in ndbinfo").fetch_all()
            ]
        else:
            self.ndbinfo_tables = []

        self.session.run_sql("set session group_concat_max_len=2048")

        # disable binlog if it's enabled and not RBR, so that temp tables created
        # by sys SPs don't get replicated
        log_bin, binlog_format = self.session.run_sql(
            "select @@sql_log_bin, @@binlog_format").fetch_one()
        if log_bin and binlog_format != "ROW" and self.session.has_pfs:
            self.disabled_binlog = True
            self.session.run_sql("SET SESSION sql_log_bin = 0")
        else:
            self.disabled_binlog = False

    def start(self, zf: zipfile.ZipFile, prefix: str,
              pfs_instrumentation: str):
        assert pfs_instrumentation in ("current", "medium", "full")

        if self.session.has_pfs:
            self.enable_pfs_instruments(pfs_instrumentation)

            self.collect_pfs_config(
                zf, f"{prefix}-applied_{pfs_instrumentation}-")
        else:
            if pfs_instrumentation != "current":
                raise Error(
                    "performance_schema is disabled, instrumentation cannot be changed"
                )

        self.status_start = self.fetch_status("start", zf, prefix)

        self.collect_metrics(zf, prefix, 0)

        if self.session.has_pfs:
            print(" - Preparing statement performance analyzer...")
            try:
                for q in [
                        "DROP TEMPORARY TABLE IF EXISTS sys.tmp_digests_start",
                        "CALL sys.statement_performance_analyzer('create_tmp', 'tmp_digests_start', NULL)",
                        "CALL sys.statement_performance_analyzer('snapshot', NULL, NULL)",
                        "CALL sys.statement_performance_analyzer('save', 'tmp_digests_start', NULL)"
                ]:
                    self.session.run_sql(q)
                self.has_performance_analyzer = True
            except Exception as e:
                print(f"Error executing {q}: {e}")
                print("Related checks will be skipped")
                self.has_performance_analyzer = False

    def iterate(self, zf: zipfile.ZipFile, prefix: str, iteration: int):
        self.collect_metrics(zf, prefix, iteration)
        self.collect_other_stats(zf, prefix, iteration)

    def start_background(self, delay: int, zf: zipfile.ZipFile, prefix: str,
                         iter_fn: Callable):

        def runloop(cond):
            cond.acquire()
            self.end_thread = False
            cond.notify()
            cond.release()
            i = 0
            while not self.end_thread:
                print("Collecting metrics...")
                self.iterate(zf, prefix, i + 1)

                iter_fn(i)

                i += 1
                # use SQL sleep because it can be interrupted
                for _ in range(delay):
                    self.session.run_sql("select sleep(1)")
                    if self.end_thread:
                        break

        self.end_thread = None
        cond = Condition()

        self.thread = Thread(target=runloop, args=(cond, ))
        self.thread.start()

        while self.end_thread is None:
            cond.acquire()
            cond.wait()
            cond.release()

    def stop_background(self):
        self.end_thread = True
        self.thread.join()

    def finish(self, zf: zipfile.ZipFile, prefix: str):
        if self.session.has_pfs:
            print(" - Writing delta of collected metrics...")
            self.dump_metrics_delta(zf, prefix)

            self.status_end = self.fetch_status("end", zf, prefix)
            self.dump_status_delta(zf, prefix)

            for q in ["CALL sys.ps_statement_avg_latency_histogram()"]:
                self.session.run_sql(q)

            if self.has_performance_analyzer:
                dump_query(
                    zf,
                    f"{prefix}-statement_performance_analyzer-delta",
                    self.session,
                    "CALL sys.statement_performance_analyzer('delta', 'tmp_digests_start', 'with_runtimes_in_95th_percentile')",
                    as_yaml=False)

                for q in [
                        "DROP TEMPORARY TABLE IF EXISTS sys.tmp_digests_start",
                        "CALL sys.statement_performance_analyzer('cleanup', NULL, NULL)"
                ]:
                    self.session.run_sql(q)

        self.cleanup(zf, prefix)

    def cleanup(self, zf: zipfile.ZipFile, prefix: str):
        if self.session.has_pfs:
            self.restore_pfs_instruments()

        if self.disabled_binlog:
            self.session.run_sql("SET sql_log_bin = 1")

    def enable_pfs_instruments(self, level):
        self.pfs_instrumentation = level

        self.session.run_sql(
            "CALL sys.ps_setup_disable_thread(CONNECTION_ID())")

        if level == "current":
            c = self.session.run_sql(
                "select count(*) from performance_schema.setup_consumers where enabled='YES'"
            ).fetch_one()[0]
            if c == 0:
                print(
                    "WARNING: performance_schema.setup_consumers is completely disabled."
                )

            if self.session.version >= 80000:
                c = self.session.run_sql(
                    "select count(*) from performance_schema.setup_threads where enabled='YES'"
                ).fetch_one()[0]
                if c == 0:
                    print(
                        "WARNING: performance_schema.setup_threads is completely disabled."
                    )
        else:
            print("Configuring Performance Schema Instrumentation")
            self.session.run_sql("CALL sys.ps_setup_save(0)")
            self.pfs_instrumentation_changed = True

            if level == "medium":
                # Enable all consumers except % history and %history_long
                self.session.run_sql(
                    """UPDATE performance_schema.setup_consumers
                    SET ENABLED = 'YES'
                WHERE NAME NOT LIKE '%\\_history%'""")

                # Enable all instruments except wait/synch/%
                self.session.run_sql(
                    """UPDATE performance_schema.setup_instruments
                    SET ENABLED = 'YES',
                        TIMED = 'YES'
                WHERE NAME NOT LIKE 'wait/synch/%'""")
            elif level == "full":
                self.session.run_sql(
                    """UPDATE performance_schema.setup_consumers
                    SET ENABLED = 'YES'""")

                self.session.run_sql(
                    """UPDATE performance_schema.setup_instruments
                    SET ENABLED = 'YES',
                        TIMED = 'YES'""")

            # Enable all threads except this one
            self.session.run_sql("""UPDATE performance_schema.threads
            SET INSTRUMENTED = 'YES'
            WHERE PROCESSLIST_ID <> CONNECTION_ID()""")

    def restore_pfs_instruments(self):
        if self.pfs_instrumentation != "current" and self.pfs_instrumentation_changed:
            print("Restoring Performance Schema Configurations")
            self.session.run_sql("CALL sys.ps_setup_reload_saved()")
            self.pfs_instrumentation_changed = False

    def fetch_status(self, stage: str, zf: zipfile.ZipFile, prefix: str):

        def formatters_for_table(table: str) -> List[Callable]:
            """
            Assemble a query on a sys view, formatting columns to be human readable.
            """
            columns = self.session.run_sql(
                "select column_name from information_schema.columns where table_schema='sys' and table_name=? order by ordinal_position",
                [table]).fetch_all()
            formatters = []
            for column, in columns:
                lcolumn = column.lower()
                if (table == "io_global_by_file_by_bytes" and lcolumn
                        == "total") or (table == "io_global_by_wait_by_bytes"
                                        and lcolumn == "total_requested"):
                    formatters.append(format_bytes)
                elif lcolumn.endswith("latency"):
                    formatters.append(format_pico_time)
                elif (lcolumn.endswith("_memory")
                      or lcolumn.endswith("_memory_allocated") or
                      lcolumn.endswith("_read") or lcolumn.endswith("_written")
                      or lcolumn.endswith("_write")
                      ) and not lcolumn.startswith("count_"):
                    formatters.append(format_bytes)
                else:
                    formatters.append(lambda f: f)
            return formatters

        status = {}
        for table_name, order_by, *_ in k_sys_views_delta:
            assert order_by
            order_by = " ORDER BY " + \
                order_by.replace("%{TABLE}", f"x${table_name}")

            query = f"SELECT * FROM `sys`.`x${table_name}`{order_by or ''}"
            formatters = formatters_for_table(table_name)

            print(f" - Gathering sys.{table_name}...")
            status[table_name] = dump_query(
                zf,
                f"{prefix}-raw/{stage}.{table_name}",
                self.session,
                query,
                as_yaml=False,
                formatters=formatters)

        return status

    def dump_status_delta(self, zf: zipfile.ZipFile, prefix: str):

        def formatters_for_table(table_name: str,
                                 pk: str) -> Tuple[List[str], List[Callable]]:
            columns = [
                r[0] for r in self.session.run_sql(
                    "select column_name from information_schema.columns where table_schema='sys' and table_name=? order by ordinal_position",
                    [table_name]).fetch_all()
            ]
            fcolumns = []

            def float_or_none(s):
                if s is None:
                    return s
                return float(s)

            for column in columns:
                lcolumn = column.lower()

                if "," + lcolumn + "," in "," + pk + ",":
                    fcolumns.append(lambda s, e: f"{s[column]}")
                elif table_name == "io_global_by_file_by_bytes" and lcolumn == "write_pct":

                    def fmt(s, e):
                        e_read = float(e["total_read"] or 0)
                        s_read = float_or_none(s["total_read"])
                        e_written = float(e["total_written"] or 0)
                        s_written = float_or_none(s["total_written"])
                        if s_read is None or s_written is None or (
                                e_read - s_read) + (e_written -
                                                    s_written) == 0:
                            return None

                        return f"{100 - ((e_read-s_read) / ((e_read - s_read) + (e_written - s_written))) * 100:.2f}"

                    fcolumns.append(fmt)
                elif (table_name, lcolumn) in [
                    ("io_global_by_file_by_bytes", "total"),
                    ("io_global_by_wait_by_bytes", "total_requested")
                ]:

                    def fmt(s, e):
                        e = float_or_none(e[column])
                        s = float(s[column] or 0)
                        if e is None:
                            return None
                        return format_bytes(e - s)

                    fcolumns.append(fmt)
                elif lcolumn[:4] in ('max_',
                                     'min_') and lcolumn.endswith('_latency'):
                    fcolumns.append(lambda s, e: (format_pico_time(
                        float(e[column])) if e[column] is not None else None))
                elif lcolumn == 'avg_latency':

                    def fmt(s, e):
                        e_lat = float(e["total_latency"] or 0)
                        s_lat = float_or_none(s["total_latency"])
                        e_total = float(e["total"] or 0)
                        s_total = float_or_none(s["total"])
                        if s_lat is None or s_total is None or e_total - s_total == 0:
                            return None
                        return format_pico_time(
                            (e_lat - s_lat) / (e_total - s_total))

                    fcolumns.append(fmt)
                elif lcolumn.endswith('_avg_latency'):
                    prefix = column[:-12]

                    def fmt(s, e):
                        e_lat = float(e[f"{prefix}_latency"] or 0)
                        s_lat = float_or_none(s[f"{prefix}_latency"])
                        e_total = float(e[f"{prefix}s"] or 0)
                        s_total = float_or_none(s[f"{prefix}s"])
                        if s_lat is None or s_total is None or e_total - s_total == 0:
                            return None
                        return format_pico_time(
                            (e_lat - s_lat) / (e_total - s_total))

                    fcolumns.append(fmt)
                elif column.endswith('latency'):
                    fcolumns.append(lambda s, e: (format_pico_time(
                        (float(e[column]) - float(s[column] or 0))
                        if e[column] is not None else None)))
                elif column in ('avg_read', 'avg_write', 'avg_written'):
                    suffix = "read" if lcolumn == "avg_read" else "written"
                    suffix2 = "read" if lcolumn == "avg_read" else "write"

                    def fmt(s, e):
                        e_total = float(s[f"total_{suffix}"] or 0)
                        s_total = float_or_none(s[f"total_{suffix}"])
                        e_count = float(e[f"count_{suffix2}"] or 0)
                        s_count = float_or_none(s[f"count_{suffix2}"])
                        if s_total == None or s_count is None:
                            return None
                        n = e_total - s_total
                        d = e_count - s_count
                        if d == 0:
                            return None
                        return format_bytes(n / d)

                    fcolumns.append(fmt)
                elif lcolumn.endswith("_memory") or lcolumn.endswith(
                        '_memory_allocated'
                ) or (lcolumn.endswith("_read") or lcolumn.endswith('_written')
                      or lcolumn.endswith("_write")
                      ) and not column.startswith("count_"):

                    def fmt(s, e):
                        e_ = float_or_none(e[column])
                        s_ = float(s[column] or 0)

                        if e_ is None:
                            return None

                        return format_bytes(e_ - s_)

                    fcolumns.append(fmt)
                else:

                    def fmt(s, e):
                        s_ = float(s[column] or 0)
                        e_ = float_or_none(e[column])
                        if e_ is None:
                            return None
                        return f"{e_ - s_}"

                    fcolumns.append(fmt)

            return columns, fcolumns

        for table_name, *_, pk in k_sys_views_delta:
            columns, formatters = formatters_for_table(table_name, pk)

            lines = []
            lines.append("# " + "\t".join(columns))

            for start_row, end_row in zip(self.status_start[table_name],
                                          self.status_end[table_name]):
                entry = []
                for i, fmt in enumerate(formatters):
                    try:
                        entry.append(fmt(start_row, end_row))
                    except:
                        print(columns[i], start_row, end_row)
                        raise
                lines.append("\t".join([s or "-" for s in entry]))

            write_tsv(zf, f"{prefix}-delta.{table_name}", lines)

    def collect_metrics(self, zf: zipfile.ZipFile, prefix: str,
                        iteration: Optional[int]):
        if not self.session.has_pfs:
            return
        if iteration is None:
            fn = f"{prefix}metrics"
        else:
            fn = f"{prefix}-raw/iteration-{iteration}.metrics"
        metrics = dump_query(
            zf,
            fn,
            self.session,
            "SELECT Variable_name, REPLACE(Variable_value, '\n', '\\\\n') AS Variable_value, Type, Enabled FROM sys.metrics",
            as_yaml=False,
            include_warnings=False)

        for row in metrics:
            name = row["Variable_name"]
            value = row["Variable_value"]
            type = row["Type"]
            enabled = row["Enabled"]
            if "name" not in self.metrics_info:
                self.metrics_info[name] = [type, enabled]
            self.metrics.setdefault(name, []).append(value)

    def collect_configs_and_state(self, zf: zipfile.ZipFile, prefix: str):
        other_pfs_tables = ["host_cache", "persisted_variables"]
        is_tables = ["plugins"]
        mysql_tables = ["audit_log_user", "audit_log_filter"]

        tables = [
            f"performance_schema.{t}" for t in self.session.pfs_tables
            if t.startswith("replication_") or t in other_pfs_tables
        ]
        tables += [
            f"information_schema.{t}" for t in self.session.is_tables
            if t in is_tables
        ]
        tables += [
            f"mysql.{t}" for t in self.session.mysql_tables
            if t in mysql_tables
        ]

        if self.session.has_ndb:
            tables.append("ndbinfo.threadblocks")

        if self.session.has_rapid:
            for t in ["rpd_nodes", "rpd_exec_stats", "rpd_query_stats"]:
                if t in self.session.pfs_tables:
                    tables.append("performance_schema." + t)

        def filter_slave_master_info(row):
            row = list(row)
            row[5] = "*****"
            return row

        if self.session.version >= 80023:
            kw_replica = "REPLICA"
            kw_replicas = "REPLICAS"
        else:
            kw_replica = "SLAVE"
            kw_replicas = "SLAVE HOSTS"

        queries = [
            ("global variables",
             """SELECT g.variable_name name, g.variable_value value /*!80000, i.variable_source source*/
            FROM performance_schema.global_variables g
            /*!80000 JOIN performance_schema.variables_info i ON g.variable_name = i.variable_name */
            ORDER BY name"""),
            "XA RECOVER CONVERT xid",

            # replication configuration
            "SHOW BINARY LOGS",
            f"SHOW {kw_replicas}",
            "SHOW MASTER STATUS",
            f"SHOW {kw_replica} STATUS",
            ("replication master_info",
             """SELECT * FROM mysql.slave_master_info ORDER BY Channel_name""",
             filter_slave_master_info),
            ("replication relay_log_info",
             """SELECT Channel_name, Sql_delay, Number_of_workers, Id
                FROM mysql.slave_relay_log_info ORDER BY Channel_name""")
        ]

        if self.session.has_rapid:
            queries += [
                ("rapid table status",
                 """SELECT rpd_table_id.ID, rpd_table_id.Name, rpd_tables.*
                FROM performance_schema.rpd_table_id, performance_schema.rpd_tables
                WHERE rpd_tables.ID = rpd_table_id.ID
                ORDER BY rpd_table_id.SCHEMA_NAME,rpd_table_id.TABLE_NAME"""),
                ("rapid total table size",
                 "SELECT SUM(IFNULL(SIZE_BYTES,0)) FROM performance_schema.rpd_tables"
                 ),
                ("rapid avail_rnstate nodes",
                 """SELECT IFNULL(SUM(memory_total), 0), IFNULL(SUM(memory_usage), 0), IFNULL(SUM(BASEREL_MEMORY_USAGE),0)
                FROM performance_schema.rpd_nodes
                WHERE status = 'AVAIL_RNSTATE'""")
            ]

        collect_tables(zf, prefix, self.session, tables, as_yaml=True)

        collect_queries(zf, prefix, self.session, queries, as_yaml=True)
        self.collect_pfs_config(zf, prefix)

    def collect_pfs_config(self, zf: zipfile.ZipFile, prefix: str):
        # pfs configuration
        queries = [
            ("pfs actors", "SELECT * FROM performance_schema.setup_actors"),
            ("pfs objects", "SELECT * FROM performance_schema.setup_objects"),
            ("pfs consumers",
             """SELECT NAME AS Consumer, ENABLED, sys.ps_is_consumer_enabled(NAME) AS COLLECTS
            FROM performance_schema.setup_consumers"""),
            ("pfs instruments",
             """SELECT SUBSTRING_INDEX(NAME, '/', 2) AS 'InstrumentClass',
                ROUND(100*SUM(IF(ENABLED = 'YES', 1, 0))/COUNT(*), 2) AS 'EnabledPct',
                ROUND(100*SUM(IF(TIMED = 'YES', 1, 0))/COUNT(*), 2) AS 'TimedPct'
            FROM performance_schema.setup_instruments
            GROUP BY SUBSTRING_INDEX(NAME, '/', 2)
            ORDER BY SUBSTRING_INDEX(NAME, '/', 2)"""),
            ("pfs threads",
             """SELECT `TYPE` AS ThreadType, COUNT(*) AS 'Total', ROUND(100*SUM(IF(INSTRUMENTED = 'YES', 1, 0))/COUNT(*), 2) AS 'InstrumentedPct'
            FROM performance_schema.threads
            GROUP BY TYPE""")
        ]
        collect_queries(zf, prefix, self.session, queries, as_yaml=True)

    def collect_other_stats(self, zf: zipfile.ZipFile, prefix: str,
                            iteration: Optional[int]):
        tables = [
            "performance_schema.metadata_locks", "performance_schema.threads",
            "sys.schema_table_lock_waits", "sys.session_ssl_status",
            "sys.session", "sys.processlist",
            "performance_schema.events_waits_current",
            "information_schema.innodb_trx",
            "information_schema.innodb_metrics"
        ]

        if self.pfs_events_wait_history_long:
            tables.append("sys.latest_file_io")

        if self.pfs_memory_instrumented:
            tables += [
                "sys.memory_by_host_by_current_bytes",
                "sys.memory_by_thread_by_current_bytes",
                "sys.memory_by_user_by_current_bytes",
                "sys.memory_global_by_current_bytes"
            ]

        queries = [
            "SHOW GLOBAL STATUS", "SHOW ENGINE INNODB STATUS",
            "SHOW ENGINE PERFORMANCE_SCHEMA STATUS", "SHOW FULL PROCESSLIST",
            "SHOW OPEN TABLES"
        ]
        if self.innodb_mutex:
            queries += ["SHOW ENGINE INNODB MUTEX"]

        if self.session.has_ndb:
            queries += [
                "SHOW ENGINE NDBCLUSTER STATUS",
                ("ndb memoryusage",
                 """SELECT node_id, memory_type, format_bytes(used) AS used, used_pages, format_bytes(total) AS total, total_pages,
                   ROUND(100*(used/total), 2) AS 'Used %'
            FROM ndbinfo.memoryusage""")
            ]

            tables += [
                t for t in self.ndbinfo_tables if t != "ndbinfo.memoryusage"
            ]
            tables += ["information_schema.FILES"]

        if iteration is not None:
            prefix += f"-raw/iteration-{iteration}."

        collect_tables(zf, prefix, self.session, tables, as_yaml=True)
        collect_queries(zf, prefix, self.session, queries, as_yaml=True)

    def dump_metrics_delta(self, zf: zipfile.ZipFile, prefix: str):
        """
        Format all collected metrics iterations and format into a human readable
        form, also including delta values between each iteration.
        """

        if not self.session.has_pfs:
            return

        # Some metrics variables doesn't make sense in delta and rate calculations even if they are numeric
        # as they really are more like settings or "current" status.
        no_delta_names = [
            'innodb_buffer_pool_pages_total', 'innodb_page_size',
            'last_query_cost', 'last_query_partial_plans',
            'qcache_total_blocks', 'slave_last_heartbeat',
            'ssl_ctx_verify_depth', 'ssl_ctx_verify_mode',
            'ssl_session_cache_size', 'ssl_verify_depth', 'ssl_verify_mode',
            'ssl_version', 'buffer_flush_lsn_avg_rate',
            'buffer_flush_pct_for_dirty', 'buffer_flush_pct_for_lsn',
            'buffer_pool_pages_total', 'lock_row_lock_time_avg',
            'lock_row_lock_time_max', 'innodb_page_size'
        ]

        def asnum(s):
            try:
                return float(s)
            except:
                return None

        # limit column width because some values can be very long with ndb
        max_field_length = 50

        with zf.open(f"{prefix}-metrics.summary.tsv", "w") as f:
            first = True
            for name, values in self.metrics.items():
                if first:
                    line = ["Variable_name"]
                    for i in range(1, len(values) + 1):
                        line.append(f"Output {i}")
                        if i > 1:
                            line.append(f"Delta ({i-1} -> {i})")
                    line += ["Type", "Enabled"]
                    f.write(("\t".join(line) + "\n").encode("utf-8"))
                    first = False

                line = [name]
                no_delta = name in no_delta_names
                prev = None
                prev_time = 0
                nvalue = None
                for i, value in enumerate(values):
                    delta = ""
                    nvalue = asnum(value)

                    line.append(value[:max_field_length])

                    if i > 0 and not no_delta and value is not None:
                        time = float(self.metrics['UNIX_TIMESTAMP()'][i])
                        prev_time = float(self.metrics['UNIX_TIMESTAMP()'][i -
                                                                           1])

                        if nvalue is not None and prev is not None:
                            if nvalue == prev:
                                delta = "0 (0/sec)"
                            else:
                                delta = f"{nvalue - prev} ({(nvalue-prev) / (time-prev_time):.2f}/sec)"
                            line.append(delta)
                    prev = nvalue

                line += self.metrics_info[name]

                f.write(("\t".join(line) + "\n").encode("utf-8"))


def collect_diagnostics(zf: zipfile.ZipFile,
                        prefix: str,
                        session: InstanceSession,
                        iterations: int,
                        delay: float,
                        pfsInstrumentation: str,
                        innodb_mutex: bool = False,
                        custom_sql: List[str] = [],
                        custom_shell: List[str] = []):
    shell = ShellExecutor(custom_shell, allow_phases=True)
    diag = DiagnosticsSession(session, innodb_mutex=innodb_mutex)
    try:
        custom = SQLExecutor(session, custom_sql, allow_phases=True)

        shell.execute_before(zf, prefix)
        custom.execute_before(zf, prefix)

        diag.collect_configs_and_state(zf, f"{prefix}")
        diag.start(zf, f"{prefix}diagnostics", pfsInstrumentation)

        for i in range(iterations):
            print(
                f"Collecting performance metrics (iteration #{i+1} of {iterations})..."
            )
            diag.iterate(zf, f"{prefix}diagnostics", i + 1)

            shell.execute_during(zf, prefix, i)
            custom.execute_during(zf, prefix, i)

            i += 1
            if i <= iterations - 1:
                print(f"Sleeping for {delay}s...")
                # use SQL sleep because it can be interrupted
                try:
                    session.run_sql("select sleep(?)", [delay])
                except KeyboardInterrupt:
                    print("^C - aborting...")
                    raise
            else:
                break
        if iterations > 0:
            print("Performance metrics collection done")

        diag.finish(zf, f"{prefix}diagnostics")

        shell.execute_after(zf, prefix)
        custom.execute_after(zf, prefix)
    except:
        diag.cleanup(zf, prefix)
        raise


def collect_diagnostics_once(zf: zipfile.ZipFile,
                             prefix: str,
                             session: InstanceSession,
                             innodb_mutex: bool = False,
                             custom_sql: List[str] = []):
    diag = DiagnosticsSession(session, innodb_mutex=innodb_mutex)
    try:
        custom = SQLExecutor(session, custom_sql, allow_phases=False)

        custom.execute(zf, prefix)

        diag.collect_configs_and_state(zf, prefix)
        diag.collect_metrics(zf, prefix, None)
        diag.collect_other_stats(zf, prefix, None)
    finally:
        diag.cleanup(zf, prefix)


def collect_cluster_metadata(zf: zipfile.ZipFile, prefix: str,
                             session: InstanceSession, ignore_errors) -> Optional[str]:
    cluster_type = None
    try:
        r = session.run_sql(
            "show full tables in mysql_innodb_cluster_metadata").fetch_all()
    except Error as e:
        if e.code == mysql.ErrorCode.ER_BAD_DB_ERROR or e.code == mysql.ErrorCode.ER_NO_SUCH_TABLE:
            return cluster_type
        if ignore_errors:
            print(f"ERROR: Could not query InnoDB Cluster metadata: {e}")
            with zf.open(
                    make_zipinfo(
                        f"{prefix}mysql_innodb_cluster_metadata.error"),
                    "w") as f:
                f.write(b"show full tables in mysql_innodb_cluster_metadata\n")
                f.write(f"{e}\n".encode("utf-8"))
            return cluster_type
        raise

    rs_possible = False
    cs_possible = False
    print("Dumping mysql_innodb_cluster_metadata schema...")
    for row in r:
        if "async_cluster_views" == row[0]:
            rs_possible = True
        elif "clusterset_views" == row[0]:
            cs_possible = True
        if row[1] != "BASE TABLE" and row[0] not in ("schema_version", ):
            continue
        table = row[0]
        dump_table(zf,
                   f"{prefix}mysql_innodb_cluster_metadata.{table}",
                   session,
                   f"mysql_innodb_cluster_metadata.{table}",
                   as_yaml=True,
                   ignore_errors=ignore_errors)

    cluster_type = "gr"
    if rs_possible or cs_possible:
        t = session.run_sql(
            "select cluster_type from mysql_innodb_cluster_metadata.v2_this_instance").fetch_one()[0]
        if t == "ar":
            cluster_type = "ar"
        else:
            try:
                t = session.run_sql(
                    "select count(*) from mysql_innodb_cluster_metadata.v2_cs_clustersets").fetch_one()[0]
                if t:
                    cluster_type = "cs"
            except:
                pass
    return cluster_type


def get_topology_members(session: InstanceSession):
    try:
        return [(r[0], r[1]) for r in session.run_sql(
            "select instance_id, endpoint from mysql_innodb_cluster_metadata.v2_instances"
        ).fetch_all()]
    except:
        return [(r[0], r[1]) for r in session.run_sql(
            "select instance_id, addresses->>'$.mysqlClassic' from mysql_innodb_cluster_metadata.instances"
        ).fetch_all()]


def collect_error_log_sql(zf: zipfile.ZipFile, path: str,
                          session: InstanceSession,
                          ignore_errors: bool) -> bool:
    if session.version >= 80022:
        print(" - Gathering error_log")

        def filter_pwd(row):
            if "temporary password" in row[5]:
                return None
            return row

        dump_table(zf,
                   path,
                   session,
                   "performance_schema.error_log",
                   filter=filter_pwd,
                   as_yaml=False,
                   ignore_errors=ignore_errors)

        return True
    else:
        return False


def collect_slow_queries(zf: zipfile.ZipFile,
                         prefix: str,
                         session: InstanceSession,
                         *,
                         ignore_errors: bool = False):
    queries = [
        ("slow queries in 95 pctile",
         "SELECT * FROM sys.statements_with_runtimes_in_95th_percentile"),
        ("slow queries summary by rows examined",
         "SELECT DIGEST, substr(DIGEST_TEXT, 1, 50), COUNT_STAR, SUM_ROWS_EXAMINED, SUM_ROWS_SENT, round(SUM_ROWS_SENT/SUM_ROWS_EXAMINED, 5) ratio FROM performance_schema.events_statements_summary_by_digest where DIGEST_TEXT like 'select%' and (SUM_ROWS_SENT/SUM_ROWS_EXAMINED) < .5 ORDER BY SUM_ROWS_EXAMINED/SUM_ROWS_SENT desc limit 20"
         )
    ]

    if "slow_log" in session.mysql_tables:
        queries.append(("slow_log", "SELECT * FROM mysql.slow_log"))

    collect_queries(zf,
                    prefix,
                    session,
                    queries,
                    as_yaml=True,
                    ignore_errors=ignore_errors)


def collect_innodb_cluster_accounts(zf: zipfile.ZipFile,
                                    prefix: str,
                                    session: InstanceSession,
                                    ignore_errors=False):
    print("Collecting InnoDB Cluster accounts and grant information")
    accounts = session.run_sql(
        "select user,host from mysql.user where user like 'mysql_innodb_%'"
    ).fetch_all()
    with zf.open(make_zipinfo(f"{prefix}cluster_accounts.tsv"), "w") as f:
        for row in accounts:
            user, host = row[0], row[1]
            f.write(f"-- {user}@{host}\n".encode("utf-8"))
            try:
                for r in session.run_sql("show grants for ?@?",
                                         args=[user, host]).fetch_all():
                    f.write((r[0] + "\n").encode("utf-8"))
            except Exception as e:
                if ignore_errors:
                    print(
                        f"WARNING: Error getting grants for {user}@{host}: {e}"
                    )
                    f.write(f"Could not get grants for {user}@{host}: {e}\n".
                            encode("utf-8"))
                else:
                    print(
                        f"ERROR: Error getting grants for {user}@{host}: {e}")
                    raise
            f.write(b"\n")


def collect_schema_stats(zf: zipfile.ZipFile,
                         prefix: str,
                         session: InstanceSession,
                         full=False,
                         ignore_errors=False):
    if full:
        print(f"Collecting Schema Information and Statistics")
    queries = [
        ("schema tables without a PK",
         """SELECT t.table_schema, t.table_name, t.table_rows, t.engine, t.data_length, t.index_length
            FROM information_schema.tables t
              LEFT JOIN information_schema.statistics s on t.table_schema=s.table_schema and t.table_name=s.table_name and s.index_name='PRIMARY'
            WHERE s.index_name is NULL and t.table_type = 'BASE TABLE'
                and t.table_schema not in ('performance_schema', 'sys', 'mysql', 'information_schema')"""
         ),
        ("schema routine size",
         "SELECT ROUTINE_TYPE, COUNT(*), SUM(LENGTH(ROUTINE_DEFINITION)) FROM information_schema.ROUTINES GROUP BY ROUTINE_TYPE;"
         ),
        ("schema table count",
         "SELECT count(*) FROM information_schema.tables"),
        ("schema unused indexes", "SELECT * FROM sys.schema_unused_indexes"),
    ]
    if full:
        queries += [
            ("schema object overview",
             "select * from sys.schema_object_overview"),
            ("schema top biggest tables",
             """select t.table_schema, t.table_name, t.row_format, t.table_rows, t.avg_row_length, t.data_length, t.max_data_length, t.index_length, t.table_collation,
        json_objectagg(idx.index_name, json_object('columns', idx.col, 'type', idx.index_type, 'cardinality', idx.cardinality)) indexes,
        group_concat((select concat(c.column_name, ':', c.column_type)
          from information_schema.columns c
          where c.table_schema = t.table_schema and c.table_name = t.table_name and c.column_type in ('blob'))) blobs
    from information_schema.tables t
    join (select s.table_schema, s.table_name, s.index_name, s.index_type, s.cardinality, json_arrayagg(concat(c.column_name, ':', c.column_type)) col
          from information_schema.statistics s left join information_schema.columns c on s.table_schema=c.table_schema and s.table_name=c.table_name and s.column_name=c.column_name
          group by s.table_schema, s.table_name, s.index_name, s.index_type, s.cardinality
          order by s.table_schema, s.table_name, s.index_name, s.index_type, s.cardinality) idx
    on idx.table_schema=t.table_schema and idx.table_name = t.table_name
    where t.table_type = 'BASE TABLE' and t.table_schema not in ('mysql', 'information_schema', 'performance_schema')
    group by t.table_schema, t.table_name, t.engine, t.row_format, t.table_rows, t.avg_row_length, t.data_length, t.max_data_length, t.index_length, t.table_collation
    order by t.data_length desc limit 20"""),
            ("schema table engines", """SELECT ENGINE, COUNT(*) AS NUM_TABLES,
                sys.format_bytes(SUM(DATA_LENGTH)) AS DATA_LENGTH,
                sys.format_bytes(SUM(INDEX_LENGTH)) AS INDEX_LENGTH,
                sys.format_bytes(SUM(DATA_LENGTH+INDEX_LENGTH)) AS TOTAL
            FROM information_schema.TABLES
            GROUP BY ENGINE"""),
            ("schema table info", "SELECT * FROM information_schema.TABLES")
        ]
    collect_queries(zf,
                    f"{prefix}",
                    session,
                    queries,
                    as_yaml=True,
                    ignore_errors=ignore_errors)


def collect_table_info(zf: zipfile.ZipFile, prefix: str,
                       session: InstanceSession, schema: str, table: str):
    with zf.open(
            make_zipinfo(
                f"{prefix}referenced_table-{sanitize(schema)}.{sanitize(table)}.yaml"
            ), "w") as f:

        def pythonize(d):
            return json.loads(str(d).replace("\n", "\\n"))

        table_status = session.run_sql(
            "SELECT * FROM information_schema.tables WHERE table_schema=? AND table_name=?",
            [schema, table]).fetch_one_object()

        if not table_status:
            print(
                f"WARNING: Could not find table `{schema}`.`{table}` referenced in query (try 'USE schema' first)"
            )
            return

        table_status = pythonize(table_status)

        indexes = []
        triggers = []
        table_stats = []
        index_stats = []
        if table_status["TABLE_TYPE"] == "VIEW":
            ddl = session.run_sql(
                f"SHOW CREATE VIEW {mysql.quote_identifier(schema)}.{mysql.quote_identifier(table)}"
            ).fetch_one()[0]
        else:
            ddl = session.run_sql(
                f"SHOW CREATE TABLE {mysql.quote_identifier(schema)}.{mysql.quote_identifier(table)}"
            ).fetch_one()[1]

            res = session.run_sql(
                f"SHOW INDEX IN {mysql.quote_identifier(schema)}.{mysql.quote_identifier(table)}"
            )
            for r in iter(res.fetch_one_object, None):
                indexes.append(pythonize(r))

            res = session.run_sql(
                "SELECT * FROM information_schema.triggers WHERE EVENT_OBJECT_SCHEMA=? AND EVENT_OBJECT_TABLE=?",
                [schema, table])
            for r in iter(res.fetch_one_object, None):
                triggers.append(pythonize(r))

            res = session.run_sql(
                "SELECT * FROM mysql.innodb_table_stats WHERE database_name=? AND table_name=?",
                [schema, table])
            for r in iter(res.fetch_one_object, None):
                table_stats.append(pythonize(r))

            res = session.run_sql(
                "SELECT * FROM mysql.innodb_index_stats WHERE database_name=? AND table_name=?",
                [schema, table])
            for r in iter(res.fetch_one_object, None):
                table_stats.append(pythonize(r))

        info = {
            "Table Name": table,
            "Table Schema": schema,
            "Table Status": table_status,
            "DDL": ddl,
            "Indexes": indexes,
            "Triggers": triggers,
            "InnoDB Table Stats": table_stats,
            "InnoDB Index Stats": index_stats
        }

        f.write(yaml.dump(info).encode("utf-8"))


def explain_query(zf: zipfile.ZipFile, session: InstanceSession, query: str,
                  prefix: str) -> dict:
    before = [
        "SET SESSION optimizer_trace='enabled=on'",
        "SET optimizer_trace_offset=-1", "SET optimizer_trace_limit=1"
    ]
    for q in before:
        session.run_sql(q)

    dump_query(zf,
               f"{prefix}explain",
               session,
               f"EXPLAIN {query}",
               as_yaml=False,
               include_warnings=True)
    dump_query(zf,
               f"{prefix}explain-optimizer_trace",
               session,
               "SELECT * FROM information_schema.optimizer_trace",
               as_yaml=True)

    after = ["SET SESSION optimizer_trace='enabled=off'"]
    for q in after:
        session.run_sql(q)

    queries = [(f"explain_json", f"EXPLAIN format=json {query}")]
    if session.version >= 80018:
        queries.append(("explain_analyze", f"EXPLAIN ANALYZE {query}"))
    return collect_queries(zf, prefix, session, queries, include_warnings=True)


def explain_heatwave_query(zf: zipfile.ZipFile, session: InstanceSession,
                           query: str, prefix: str):
    queries = [
        "SET SESSION use_secondary_engine=ON",
        "SELECT NOW()",
        f"EXPLAIN {query}",
        "SHOW SESSION STATUS LIKE 'rapid%'",
        "SELECT NOW()",
        "SET SESSION optimizer_trace='enabled=on'",
        # why -2? idk, see https://dev.mysql.com/doc/heatwave/en/heatwave-running-queries.html#heatwave-debugging-queries
        "SET optimizer_trace_offset=-2",
        f"EXPLAIN {query}",
        "SELECT query, trace->'$**.Rapid_Offload_Fails', trace->'$**.secondary_engine_not_used' FROM information_schema.optimizer_trace",
        "SELECT * FROM information_schema.optimizer_trace",
        "SET SESSION use_secondary_engine=OFF",
        "SET SESSION optimizer_trace='enabled=off'"
    ]

    collect_queries_single_file(zf, f"{prefix}explain-rapid.txt", session,
                                queries)

Youez - 2016 - github.com/yon3zu
LinuXploit