Skip to content

snowpark

pinky_snowpark.snowpark

Snowpark helpers — session utilities, stage I/O, telemetry, PGP, rotating filenames.

All Snowflake / Snowpark imports are lazy: the module is importable without a Snowflake connection (local scripts, CI).

Requires an active Snowpark session for most functions. PGP functions additionally require the pgp optional dependency::

pip install pinky-snowpark[pgp]

AvroFormat dataclass

Avro output via DataFrameWriter. Binary format — PGP not applicable.

Source code in src/pinky_snowpark/snowpark.py
522
523
524
@dataclass
class AvroFormat:
    """Avro output via ``DataFrameWriter``. Binary format — PGP not applicable."""

CsvFormat dataclass

CSV (or positional text) output options. PGP encryption is supported.

All fields are the actual csv.writer parameters — no indirection through a preset name. Use the module-level preset instances (CSV_PIPE, CSV_TAB, …) as starting points, or build your own. Combine with dataclasses.replace to override individual fields::

from dataclasses import replace
fmt = replace(CSV_PIPE, header=True, lineterminator="\r\n")

Attributes:

Name Type Description
delimiter str

Field separator (default ",").

lineterminator str

Row terminator (default "\n").

quoting int

csv.QUOTE_* mode (default csv.QUOTE_MINIMAL).

header bool

Write column names as the first row (default False).

positional bool

Fixed-width mode — write only str(row[0]) per line, no csv.writer. Useful for single-column text extracts.

Source code in src/pinky_snowpark/snowpark.py
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
@dataclass
class CsvFormat:
    """CSV (or positional text) output options. PGP encryption is supported.

    All fields are the actual csv.writer parameters — no indirection through a
    preset name. Use the module-level preset instances (``CSV_PIPE``, ``CSV_TAB``,
    …) as starting points, or build your own. Combine with ``dataclasses.replace``
    to override individual fields::

        from dataclasses import replace
        fmt = replace(CSV_PIPE, header=True, lineterminator="\\r\\n")

    Attributes:
        delimiter:       Field separator (default ``","``).
        lineterminator:  Row terminator (default ``"\\n"``).
        quoting:         ``csv.QUOTE_*`` mode (default ``csv.QUOTE_MINIMAL``).
        header:          Write column names as the first row (default ``False``).
        positional:      Fixed-width mode — write only ``str(row[0])`` per line,
                         no csv.writer. Useful for single-column text extracts.
    """
    delimiter: str = ","
    lineterminator: str = "\n"
    quoting: int = csv.QUOTE_MINIMAL
    header: bool = False
    positional: bool = False

EnvPattern

Bases: Enum

Strategy for deriving the current environment from Snowflake session context.

Pinky convention: ACCOUNT — one account = one environment. Legacy convention: DATABASE_SUFFIX — retro-compat with _SANDBOX / _PRODUCTION suffix.

Attributes:

Name Type Description
ACCOUNT

One Snowflake account = one environment (pinky default).

DATABASE_SUFFIX

MY_DB_SANDBOXSANDBOX.

DATABASE_PREFIX

SANDBOX_MY_DBSANDBOX.

SCHEMA_SUFFIX

MY_SCHEMA_QAQA.

SCHEMA_PREFIX

QA_MY_SCHEMAQA.

Source code in src/pinky_snowpark/snowpark.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class EnvPattern(Enum):
    """Strategy for deriving the current environment from Snowflake session context.

    Pinky convention: ``ACCOUNT`` — one account = one environment.
    Legacy convention: ``DATABASE_SUFFIX`` — retro-compat with ``_SANDBOX`` / ``_PRODUCTION`` suffix.

    Attributes:
        ACCOUNT:         One Snowflake account = one environment (pinky default).
        DATABASE_SUFFIX: ``MY_DB_SANDBOX`` → ``SANDBOX``.
        DATABASE_PREFIX: ``SANDBOX_MY_DB`` → ``SANDBOX``.
        SCHEMA_SUFFIX:   ``MY_SCHEMA_QA`` → ``QA``.
        SCHEMA_PREFIX:   ``QA_MY_SCHEMA`` → ``QA``.
    """
    ACCOUNT = "ACCOUNT"
    DATABASE_SUFFIX = "DATABASE_SUFFIX"
    DATABASE_PREFIX = "DATABASE_PREFIX"
    SCHEMA_SUFFIX = "SCHEMA_SUFFIX"
    SCHEMA_PREFIX = "SCHEMA_PREFIX"

JsonFormat dataclass

NDJSON output — one JSON object per line. PGP encryption is supported.

Source code in src/pinky_snowpark/snowpark.py
512
513
514
@dataclass
class JsonFormat:
    """NDJSON output — one JSON object per line. PGP encryption is supported."""

ParquetFormat dataclass

Parquet output via DataFrameWriter. Binary format — PGP not applicable.

Source code in src/pinky_snowpark/snowpark.py
517
518
519
@dataclass
class ParquetFormat:
    """Parquet output via ``DataFrameWriter``. Binary format — PGP not applicable."""

collect_dicts(df)

Collect a Snowpark DataFrame as a list of plain dicts.

Shorthand for [r.as_dict(recursive=True) for r in df.collect()].

Parameters:

Name Type Description Default
df 'snowpark.DataFrame'

Snowpark DataFrame to collect.

required

Returns:

Type Description
list[dict]

List of row dicts with nested structures recursively expanded.

Source code in src/pinky_snowpark/snowpark.py
142
143
144
145
146
147
148
149
150
151
152
153
def collect_dicts(df: "snowpark.DataFrame") -> list[dict]:
    """Collect a Snowpark DataFrame as a list of plain dicts.

    Shorthand for ``[r.as_dict(recursive=True) for r in df.collect()]``.

    Args:
        df: Snowpark DataFrame to collect.

    Returns:
        List of row dicts with nested structures recursively expanded.
    """
    return [r.as_dict(recursive=True) for r in df.collect()]

decrypt_file(session, stage, file_path, private_key, passphrase, file_format='csv', columns=None, field_delimiter=',', encoding='utf-8', skip_header=0)

Decrypt a PGP-encrypted file from a Snowflake stage and return a DataFrame.

Downloads the encrypted file via a scoped URL, decrypts it with the private key, then builds a DataFrame from the decrypted content. For non-CSV formats the decrypted file is re-uploaded to _DECRYPT_TEMP_STAGE before reading.

Parameters:

Name Type Description Default
session 'snowpark.Session'

Active Snowpark session.

required
stage str

Source stage containing the encrypted file.

required
file_path str

Relative path of the file within the stage.

required
private_key 'pgpy.PGPKey'

PGP private key for decryption.

required
passphrase str

Passphrase to unlock the private key.

required
file_format str

Format of the decrypted file — csv | json | jsonl | avro | parquet | orc | xml.

'csv'
columns list[str] | None

Column names for CSV files without a header row (None → first row is the header).

None
field_delimiter str

CSV field delimiter (default ",").

','
encoding str

Text encoding (default "utf-8").

'utf-8'
skip_header int

Number of header rows to skip for CSV (default 0).

0

Returns:

Type Description
'snowpark.DataFrame'

Snowpark DataFrame containing the decrypted data.

Raises:

Type Description
ValueError

If file_path is invalid or file_format is unsupported.

Source code in src/pinky_snowpark/snowpark.py
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
def decrypt_file(
    session: "snowpark.Session",
    stage: str,
    file_path: str,
    private_key: "pgpy.PGPKey",
    passphrase: str,
    file_format: str = "csv",
    columns: list[str] | None = None,
    field_delimiter: str = ",",
    encoding: str = "utf-8",
    skip_header: int = 0,
) -> "snowpark.DataFrame":
    """Decrypt a PGP-encrypted file from a Snowflake stage and return a DataFrame.

    Downloads the encrypted file via a scoped URL, decrypts it with the
    private key, then builds a DataFrame from the decrypted content.
    For non-CSV formats the decrypted file is re-uploaded to
    ``_DECRYPT_TEMP_STAGE`` before reading.

    Args:
        session:         Active Snowpark session.
        stage:           Source stage containing the encrypted file.
        file_path:       Relative path of the file within the stage.
        private_key:     PGP private key for decryption.
        passphrase:      Passphrase to unlock the private key.
        file_format:     Format of the decrypted file —
                         ``csv`` | ``json`` | ``jsonl`` | ``avro`` | ``parquet``
                         | ``orc`` | ``xml``.
        columns:         Column names for CSV files without a header row
                         (``None`` → first row is the header).
        field_delimiter: CSV field delimiter (default ``","``).
        encoding:        Text encoding (default ``"utf-8"``).
        skip_header:     Number of header rows to skip for CSV (default ``0``).

    Returns:
        Snowpark DataFrame containing the decrypted data.

    Raises:
        ValueError: If ``file_path`` is invalid or ``file_format`` is unsupported.
    """
    import pgpy
    from snowflake.snowpark.files import SnowflakeFile
    from snowflake.snowpark.types import StringType, StructField, StructType

    safe_filepath(file_path)
    if file_format not in _SUPPORTED_FORMATS:
        raise ValueError(
            f"Unsupported format {file_format!r} — supported: {', '.join(sorted(_SUPPORTED_FORMATS))}"
        )

    file_url = stage_scoped_file_url(session, stage, file_path)

    with SnowflakeFile.open(file_url, "rb") as f:
        encrypted_data = f.read()

    with private_key.unlock(passphrase):
        encrypted_message = pgpy.PGPMessage.from_blob(encrypted_data)
        decrypted_message = private_key.decrypt(encrypted_message)

    if file_format == "csv":
        reader = csv.reader(
            StringIO(decrypted_message.message.decode(encoding)),
            delimiter=field_delimiter,
        )
        rows = list(reader)
        headers = columns if columns else rows[skip_header]
        n = len(headers)
        data = [
            [(cell or None) for cell in row] + [None] * (n - len(row))
            for row in rows[skip_header + 1:]
        ]
        schema = StructType([StructField(h, StringType(), nullable=True) for h in headers])
        return session.create_dataframe(data, schema=schema)

    tmp_path = f"/tmp/decrypted.{file_format}"
    with open(tmp_path, "wb") as f:
        msg = decrypted_message.message
        f.write(msg if isinstance(msg, (bytes, bytearray)) else msg.encode(encoding))

    _ensure_temp_stage(session, _DECRYPT_TEMP_STAGE)
    session.file.put(
        local_file_name=f"file://{tmp_path}",
        stage_location=_DECRYPT_TEMP_STAGE,
        auto_compress=False,
        overwrite=True,
    )

    stage_path = f"{_DECRYPT_TEMP_STAGE}/decrypted.{file_format}"
    reader_dispatch = {
        "json":    lambda: session.read.json(stage_path),
        "jsonl":   lambda: session.read.json(stage_path),
        "avro":    lambda: session.read.avro(stage_path),
        "parquet": lambda: session.read.parquet(stage_path),
        "orc":     lambda: session.read.orc(stage_path),
        "xml":     lambda: session.read.xml(stage_path),
    }
    return reader_dispatch[file_format]()

df_dynamic_row(df, delimiter=';', null_replacement='', col_name='ROW', date_format=None, timestamp_format=None, number_format=None)

Collapse all columns of a DataFrame into a single delimited-string column.

Uses concat_ws(lit(delimiter), "*") where "*" selects all columns of the current DataFrame — the column count does not need to be known in advance. This makes union_all possible between DataFrames with heterogeneous schemas: reduce each block to (key_col, ROW) before assembling.

null_replacement is applied before concatenation via na.replace. Without this step concat_ws silently drops NULL values, producing rows with fewer fields than expected.

When date_format, timestamp_format, or number_format are set, to_char is applied to columns of the matching type before concatenation, preventing Snowflake's implicit cast from producing ISO timestamps or scientific-notation numbers in the output.

Format strings follow Snowflake's TO_CHAR conventions:

  • Date / timestamp: 'DD/MM/YYYY', 'YYYY-MM-DD HH24:MI:SS'
  • Number: 'FM999999990.00', '9,999.99'

Parameters:

Name Type Description Default
df 'snowpark.DataFrame'

Source DataFrame (any number of columns).

required
delimiter str

Field separator (default ";").

';'
null_replacement str

String substituted for NULL values (default "").

''
col_name str

Name of the resulting column (default "ROW").

'ROW'
date_format str | None

TO_CHAR format applied to DateType columns. None = Snowflake default cast.

None
timestamp_format str | None

TO_CHAR format applied to TimestampType columns. None = Snowflake default cast.

None
number_format str | None

TO_CHAR format applied to numeric columns (DecimalType, DoubleType, FloatType, LongType, IntegerType). None = Snowflake default cast.

None

Returns:

Type Description
'snowpark.DataFrame'

Single-column DataFrame named col_name.

Example::

# Blocks with different schemas unified into one file
df_file = (
    df_header.select(col("RECORD_ID"), df_dynamic_row(df_header).ROW)
    .union_all(df_body.select(
        col("RECORD_ID"),
        df_dynamic_row(
            df_body,
            date_format="DD/MM/YYYY",
            number_format="FM999999990.00",
        ).ROW,
    ))
    .sort(col("RECORD_ID"))
)
Source code in src/pinky_snowpark/snowpark.py
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
def df_dynamic_row(
    df: "snowpark.DataFrame",
    delimiter: str = ";",
    null_replacement: str = "",
    col_name: str = "ROW",
    date_format: str | None = None,
    timestamp_format: str | None = None,
    number_format: str | None = None,
) -> "snowpark.DataFrame":
    """Collapse all columns of a DataFrame into a single delimited-string column.

    Uses ``concat_ws(lit(delimiter), "*")`` where ``"*"`` selects all columns
    of the current DataFrame — the column count does not need to be known in
    advance. This makes ``union_all`` possible between DataFrames with
    heterogeneous schemas: reduce each block to ``(key_col, ROW)`` before
    assembling.

    ``null_replacement`` is applied before concatenation via ``na.replace``.
    Without this step ``concat_ws`` silently drops ``NULL`` values, producing
    rows with fewer fields than expected.

    When ``date_format``, ``timestamp_format``, or ``number_format`` are set,
    ``to_char`` is applied to columns of the matching type before concatenation,
    preventing Snowflake's implicit cast from producing ISO timestamps or
    scientific-notation numbers in the output.

    Format strings follow Snowflake's ``TO_CHAR`` conventions:

    - Date / timestamp: ``'DD/MM/YYYY'``, ``'YYYY-MM-DD HH24:MI:SS'`` …
    - Number: ``'FM999999990.00'``, ``'9,999.99'`` …

    Args:
        df:               Source DataFrame (any number of columns).
        delimiter:        Field separator (default ``";"``).
        null_replacement: String substituted for ``NULL`` values (default ``""``).
        col_name:         Name of the resulting column (default ``"ROW"``).
        date_format:      ``TO_CHAR`` format applied to ``DateType`` columns.
                          ``None`` = Snowflake default cast.
        timestamp_format: ``TO_CHAR`` format applied to ``TimestampType`` columns.
                          ``None`` = Snowflake default cast.
        number_format:    ``TO_CHAR`` format applied to numeric columns
                          (``DecimalType``, ``DoubleType``, ``FloatType``,
                          ``LongType``, ``IntegerType``).
                          ``None`` = Snowflake default cast.

    Returns:
        Single-column DataFrame named ``col_name``.

    Example::

        # Blocks with different schemas unified into one file
        df_file = (
            df_header.select(col("RECORD_ID"), df_dynamic_row(df_header).ROW)
            .union_all(df_body.select(
                col("RECORD_ID"),
                df_dynamic_row(
                    df_body,
                    date_format="DD/MM/YYYY",
                    number_format="FM999999990.00",
                ).ROW,
            ))
            .sort(col("RECORD_ID"))
        )
    """
    from snowflake.snowpark.functions import col, concat_ws, lit, to_char
    from snowflake.snowpark.types import (
        DateType,
        DecimalType,
        DoubleType,
        FloatType,
        IntegerType,
        LongType,
        ShortType,
        TimestampType,
    )

    _NUMERIC_TYPES = (DecimalType, DoubleType, FloatType, LongType, IntegerType, ShortType)

    if date_format or timestamp_format or number_format:
        exprs = []
        for field in df.schema.fields:
            c = col(field.column_identifier.quoted_name)
            if date_format and isinstance(field.datatype, DateType):
                c = to_char(c, date_format)
            elif timestamp_format and isinstance(field.datatype, TimestampType):
                c = to_char(c, timestamp_format)
            elif number_format and isinstance(field.datatype, _NUMERIC_TYPES):
                c = to_char(c, number_format)
            exprs.append(c)
        df = df.select(exprs)

    return df.na.replace(null_replacement).select(
        concat_ws(lit(delimiter), "*").alias(col_name)
    )

execute_statements(session, statements)

Execute a list of SQL statements in sequence.

Pairs naturally with pinky_core.sql.split_sql_statements to run the contents of a SQL file statement by statement.

Parameters:

Name Type Description Default
session 'snowpark.Session'

Active Snowpark session.

required
statements list[str]

List of SQL strings without trailing ;.

required

Returns:

Type Description
list[list[dict]]

List of result sets — one list[dict] per statement.

list[list[dict]]

Statements that produce no rows (DDL, DML) return an empty list.

Source code in src/pinky_snowpark/snowpark.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def execute_statements(
    session: "snowpark.Session",
    statements: list[str],
) -> list[list[dict]]:
    """Execute a list of SQL statements in sequence.

    Pairs naturally with ``pinky_core.sql.split_sql_statements`` to
    run the contents of a SQL file statement by statement.

    Args:
        session:    Active Snowpark session.
        statements: List of SQL strings without trailing ``;``.

    Returns:
        List of result sets — one ``list[dict]`` per statement.
        Statements that produce no rows (DDL, DML) return an empty list.
    """
    results = []
    for stmt in statements:
        stmt = stmt.strip()
        if stmt:
            rows = session.sql(stmt).collect()  # noqa: SNOWPARK-API — batch SQL executor by design
            results.append([r.as_dict(recursive=True) for r in rows])
    return results

get_current_snowsight_url(session)

Return the Snowsight URL for the current account.

Parameters:

Name Type Description Default
session 'snowpark.Session'

Active Snowpark session.

required

Returns:

Type Description
str

URL of the form https://app.snowflake.com/{org}/{account}.

Source code in src/pinky_snowpark/snowpark.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
def get_current_snowsight_url(session: "snowpark.Session") -> str:
    """Return the Snowsight URL for the current account.

    Args:
        session: Active Snowpark session.

    Returns:
        URL of the form ``https://app.snowflake.com/{org}/{account}``.
    """
    return str(
        session.sql(  # noqa: SNOWPARK-API — no Python alternative for CURRENT_ORGANIZATION_NAME()
            "SELECT LOWER(CONCAT_WS('/', ?::STRING, CURRENT_ORGANIZATION_NAME(), CURRENT_ACCOUNT_NAME()))",
            params=[_BASE_SNOWSIGHT_URL],
        ).collect()[0][0]
    )

get_environment(session, pattern=EnvPattern.DATABASE_SUFFIX)

Derive the current environment name from the Snowflake session context.

Parameters:

Name Type Description Default
session 'snowpark.Session'

Active Snowpark session.

required
pattern EnvPattern

Strategy for environment detection (default DATABASE_SUFFIX). Pinky convention: use ACCOUNT — one account = one env.

DATABASE_SUFFIX

Returns:

Type Description
str

Environment name — e.g. SANDBOX or PRODUCTION.

Raises:

Type Description
ValueError

If ACCOUNT pattern is requested but account name cannot be resolved, or if the pattern is unsupported.

Example::

from pinky_snowpark import get_environment, EnvPattern

env = get_environment(session, EnvPattern.ACCOUNT)
# → "SANDBOX" or "PRODUCTION"
Source code in src/pinky_snowpark/snowpark.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def get_environment(
    session: "snowpark.Session",
    pattern: EnvPattern = EnvPattern.DATABASE_SUFFIX,
) -> str:
    """Derive the current environment name from the Snowflake session context.

    Args:
        session: Active Snowpark session.
        pattern: Strategy for environment detection (default ``DATABASE_SUFFIX``).
                 Pinky convention: use ``ACCOUNT`` — one account = one env.

    Returns:
        Environment name — e.g. ``SANDBOX`` or ``PRODUCTION``.

    Raises:
        ValueError: If ``ACCOUNT`` pattern is requested but account name
                    cannot be resolved, or if the pattern is unsupported.

    Example::

        from pinky_snowpark import get_environment, EnvPattern

        env = get_environment(session, EnvPattern.ACCOUNT)
        # → "SANDBOX" or "PRODUCTION"
    """
    if pattern == EnvPattern.ACCOUNT:
        account = str(session.get_current_account()).strip('"').upper()
        if "SANDBOX" in account:
            return "SANDBOX"
        if "PRODUCTION" in account or "PROD" in account:
            return "PRODUCTION"
        raise ValueError(f"Cannot derive environment from account name: {account!r}")

    if pattern == EnvPattern.DATABASE_SUFFIX:
        db = str(session.get_current_database()).strip('"').upper()
        return db.split("_")[-1]

    if pattern == EnvPattern.DATABASE_PREFIX:
        db = str(session.get_current_database()).strip('"').upper()
        return db.split("_")[0]

    if pattern == EnvPattern.SCHEMA_SUFFIX:
        schema = str(session.get_current_schema()).strip('"').upper()
        return schema.split("_")[-1]

    if pattern == EnvPattern.SCHEMA_PREFIX:
        schema = str(session.get_current_schema()).strip('"').upper()
        return schema.split("_")[0]

    raise ValueError(f"Unsupported EnvPattern: {pattern!r}")

get_rotating_filename(session, stage, folder, base_name, extension, retention='7d')

Generate a rotating file name based on today's date slot and an upload counter.

The pattern {base_name}_{slot}_{count}.{extension} provides natural retention via overwrite: older slots are reused and overwritten, so no cleanup job is required.

Slot assignment: - ≤ 7 days → weekday number (1–7) - ≤ 31 days → day of month (1–31) - ≤ 93 days → Q{quarter}_{day} (e.g. Q1_15) - ≤ 186 days → S{half}_{day} (e.g. S1_15) - > 186 days → {month}_{day} (e.g. 3_15)

Parameters:

Name Type Description Default
session 'snowpark.Session'

Active Snowpark session.

required
stage str

Stage name without @ (e.g. "MY_STAGE").

required
folder str

Sub-folder within the stage.

required
base_name str

File name prefix (e.g. "EXPORT_EMPLOYEES").

required
extension str

File extension without . (e.g. "csv").

required
retention str | int

Retention policy — preset string ("7d", "1m", "3m", "6m", "1y") or number of days (int).

'7d'

Returns:

Type Description
str

Computed file name (e.g. "EXPORT_EMPLOYEES_3_0.csv").

Source code in src/pinky_snowpark/snowpark.py
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
def get_rotating_filename(
    session: "snowpark.Session",
    stage: str,
    folder: str,
    base_name: str,
    extension: str,
    retention: str | int = "7d",
) -> str:
    """Generate a rotating file name based on today's date slot and an upload counter.

    The pattern ``{base_name}_{slot}_{count}.{extension}`` provides natural
    retention via overwrite: older slots are reused and overwritten, so no
    cleanup job is required.

    Slot assignment:
    - ≤ 7 days  → weekday number (1–7)
    - ≤ 31 days → day of month (1–31)
    - ≤ 93 days → ``Q{quarter}_{day}``  (e.g. ``Q1_15``)
    - ≤ 186 days → ``S{half}_{day}``    (e.g. ``S1_15``)
    - > 186 days → ``{month}_{day}``    (e.g. ``3_15``)

    Args:
        session:   Active Snowpark session.
        stage:     Stage name without ``@`` (e.g. ``"MY_STAGE"``).
        folder:    Sub-folder within the stage.
        base_name: File name prefix (e.g. ``"EXPORT_EMPLOYEES"``).
        extension: File extension without ``.`` (e.g. ``"csv"``).
        retention: Retention policy — preset string (``"7d"``, ``"1m"``, ``"3m"``,
                   ``"6m"``, ``"1y"``) or number of days (int).

    Returns:
        Computed file name (e.g. ``"EXPORT_EMPLOYEES_3_0.csv"``).
    """
    from snowflake.core import Root

    if isinstance(retention, str):
        retention_days = RETENTION_PRESETS.get(retention)
        if retention_days is None:
            retention_days = int(retention)
    else:
        retention_days = retention

    today = date.today()
    slot = _compute_slot(today, retention_days)
    path_prefix = "/".join(filter(None, [folder, f"{base_name}_{slot}_"]))

    db = str(session.get_current_database()).replace('"', '')
    schema = str(session.get_current_schema()).replace('"', '')
    root = Root(session)
    stage_ref = root.databases[db].schemas[schema].stages[stage.lstrip("@")]
    count_today = len(list(stage_ref.list_files(pattern=f"{path_prefix}.*")))

    return f"{base_name}_{slot}_{count_today}.{extension}"

load_kwargs(raw_args, required_keys=None, optional_keys=None, resolution=None, keep_all=False)

Resolve stored-procedure arguments, flattening _TASK$key prefixed keys.

Snowflake task DAGs inject context keys as _TASK_NAME$key. This function normalises them: root keys (no prefix) take precedence unless overridden by resolution.

Parameters:

Name Type Description Default
raw_args dict

Raw dict received by the SP (root keys + _TASK$key keys).

required
required_keys set[str] | None

Set of keys that must be present after resolution. None → keep all. Empty set → return {}.

None
optional_keys set[str] | None

Set of optional keys to also resolve (ignored if required_keys is None).

None
resolution dict[str, str] | None

{key: origin} to force the source when the same key appears under multiple prefixes (e.g. {"env": "_TASK_A"}).

None
keep_all bool

If True, unresolved prefixed keys are appended in task_name$$key form.

False

Returns:

Type Description
dict

Resolved dict of business keys (plus prefixed keys when keep_all=True).

Raises:

Type Description
ValueError

If a forced origin in resolution is not found, or if required keys are missing after resolution.

Source code in src/pinky_snowpark/snowpark.py
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
def load_kwargs(
    raw_args: dict,
    required_keys: set[str] | None = None,
    optional_keys: set[str] | None = None,
    resolution: dict[str, str] | None = None,
    keep_all: bool = False,
) -> dict:
    """Resolve stored-procedure arguments, flattening ``_TASK$key`` prefixed keys.

    Snowflake task DAGs inject context keys as ``_TASK_NAME$key``. This function
    normalises them: root keys (no prefix) take precedence unless overridden by
    ``resolution``.

    Args:
        raw_args:      Raw dict received by the SP (root keys + ``_TASK$key`` keys).
        required_keys: Set of keys that must be present after resolution.
                       ``None`` → keep all. Empty set → return ``{}``.
        optional_keys: Set of optional keys to also resolve (ignored if ``required_keys`` is ``None``).
        resolution:    ``{key: origin}`` to force the source when the same key appears
                       under multiple prefixes (e.g. ``{"env": "_TASK_A"}``).
        keep_all:      If ``True``, unresolved prefixed keys are appended in
                       ``task_name$$key`` form.

    Returns:
        Resolved dict of business keys (plus prefixed keys when ``keep_all=True``).

    Raises:
        ValueError: If a forced origin in ``resolution`` is not found, or if
                    required keys are missing after resolution.
    """
    from snowflake import telemetry

    resolution = resolution or {}
    candidates: dict[str, dict[str, Any]] = {}
    prefixed: dict[str, Any] = {}

    for raw_key, value in raw_args.items():
        has_prefix = "$" in raw_key
        value_key = raw_key.split("$")[-1]
        origin = raw_key.split("$")[0] if has_prefix else "_root"

        all_keys = (required_keys or set()) | (optional_keys or set())
        if required_keys is None or value_key in all_keys:
            if value_key not in candidates:
                candidates[value_key] = {}
            candidates[value_key][origin] = value

        if keep_all and has_prefix:
            snake_key = f"{origin.lstrip('_').lower()}$${value_key}"
            prefixed[snake_key] = value

    if required_keys == set():
        return {}

    resolved: dict[str, Any] = {}
    for key, origins in candidates.items():
        if key in resolution:
            forced_origin = resolution[key]
            if forced_origin in origins:
                resolved[key] = origins[forced_origin]
            else:
                raise ValueError(
                    f"Forced origin={forced_origin!r} for key={key!r} not found in origins={list(origins)}"
                )
        elif "_root" in origins:
            resolved[key] = origins["_root"]
        else:
            first = next(iter(origins))
            if len(origins) > 1:
                logger.warning(
                    "Duplicate key=%s, origins=%s, no root/resolution, using=%s",
                    key, list(origins), first,
                )
            resolved[key] = origins[first]

    if required_keys is not None:
        missing_keys = required_keys - resolved.keys()
        if missing_keys:
            raise ValueError(f"Missing required keys in args: {missing_keys}")

    if keep_all:
        resolved.update(prefixed)

    for k, v in to_telemetry_attrs(resolved).items():
        telemetry.set_span_attribute(k, v)
    telemetry.add_event("sp_start", {"arg_count": len(resolved)})

    return resolved

load_pgp_key(key_filename, stage='@_ADMIN.KEYS')

Load a PGP public or private key from a Snowflake stage.

Must be called from inside a stored procedure or UDF handler where SnowflakeFile is available.

Parameters:

Name Type Description Default
key_filename str

Key file name on the stage (e.g. "public.asc").

required
stage str

Stage path holding the key files (default "@_ADMIN.KEYS").

'@_ADMIN.KEYS'

Returns:

Type Description
'pgpy.PGPKey'

Loaded pgpy.PGPKey object.

Source code in src/pinky_snowpark/snowpark.py
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
def load_pgp_key(key_filename: str, stage: str = "@_ADMIN.KEYS") -> "pgpy.PGPKey":
    """Load a PGP public or private key from a Snowflake stage.

    Must be called from inside a stored procedure or UDF handler where
    ``SnowflakeFile`` is available.

    Args:
        key_filename: Key file name on the stage (e.g. ``"public.asc"``).
        stage:        Stage path holding the key files (default ``"@_ADMIN.KEYS"``).

    Returns:
        Loaded ``pgpy.PGPKey`` object.
    """
    import pgpy
    from snowflake.snowpark.files import SnowflakeFile

    with SnowflakeFile.open(f"{stage}/{key_filename}", "rb", require_scoped_url=False) as f:
        key_data = f.read()
    pgp_key, _ = pgpy.PGPKey.from_blob(key_data)
    return pgp_key

run_generated_sql(session, generator_sql)

Execute a SQL string produced by a metadata query.

Runs generator_sql first — it must return a single row with a single column containing the SQL to execute. Then executes that SQL and returns all rows as dicts.

Typical use: build a per-schema UNION via INFORMATION_SCHEMA + LISTAGG, then execute the union to collect values across all schemas.

Parameters:

Name Type Description Default
session 'snowpark.Session'

Active Snowpark session.

required
generator_sql str

SQL that returns exactly one string (the SQL to run).

required

Returns:

Type Description
list[dict]

Rows from the generated SQL as list[dict].

Source code in src/pinky_snowpark/snowpark.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def run_generated_sql(
    session: "snowpark.Session",
    generator_sql: str,
) -> list[dict]:
    """Execute a SQL string produced by a metadata query.

    Runs *generator_sql* first — it must return a single row with a single
    column containing the SQL to execute. Then executes that SQL and returns
    all rows as dicts.

    Typical use: build a per-schema ``UNION`` via ``INFORMATION_SCHEMA`` +
    ``LISTAGG``, then execute the union to collect values across all schemas.

    Args:
        session:       Active Snowpark session.
        generator_sql: SQL that returns exactly one string (the SQL to run).

    Returns:
        Rows from the generated SQL as ``list[dict]``.
    """
    generated = str(session.sql(generator_sql).collect()[0][0])  # noqa: SNOWPARK-API — generated SQL executor by design
    return [row.as_dict(recursive=True) for row in session.sql(generated).collect()]  # noqa: SNOWPARK-API — generated SQL executor by design

sp_trace(v_result, v_return_values=None, *, status='ok')

Emit SP telemetry at exit — call in a finally block.

Symmetric to the set_span_attribute calls in :func:load_kwargs. Each step entry is flattened to individual span attributes so they are directly queryable in EVENT_TABLE without JSON parsing.

Numeric values (rows_updated, total_rows, …) are emitted with their native int type — they are metrics candidates once snowflake.telemetry exposes add_metric.

Parameters:

Name Type Description Default
v_result list[dict]

List of step dicts accumulated during SP execution (e.g. [{"rows_synced": 12}, {"offset": 0, "rows_updated": 500}]).

required
v_return_values dict | None

Final return dict exposed in the SP result (optional).

None
status str

"ok" (default) or "error". Set to "error" in the except block before re-raising so the span reflects a failed execution.

'ok'

Example::

v_result: list = []
v_return_values: dict = {}
try:
    # … SP logic …
    v_result.append({"rows_updated": result.rows_updated})
    v_return_values["continue"] = rows_updated > 0
    return {"continue": 1, "return_values": v_return_values}
except Exception:
    status = "error"
    raise
finally:
    sp_trace(v_result, v_return_values, status=status)
Source code in src/pinky_snowpark/snowpark.py
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
def sp_trace(
    v_result: list[dict],
    v_return_values: dict | None = None,
    *,
    status: str = "ok",
) -> None:
    """Emit SP telemetry at exit — call in a ``finally`` block.

    Symmetric to the ``set_span_attribute`` calls in :func:`load_kwargs`.
    Each step entry is flattened to individual span attributes so they are
    directly queryable in ``EVENT_TABLE`` without JSON parsing.

    Numeric values (``rows_updated``, ``total_rows``, …) are emitted with
    their native ``int`` type — they are metrics candidates once
    ``snowflake.telemetry`` exposes ``add_metric``.

    Args:
        v_result:        List of step dicts accumulated during SP execution
                         (e.g. ``[{"rows_synced": 12}, {"offset": 0, "rows_updated": 500}]``).
        v_return_values: Final return dict exposed in the SP result (optional).
        status:          ``"ok"`` (default) or ``"error"``. Set to ``"error"``
                         in the ``except`` block before re-raising so the span
                         reflects a failed execution.

    Example::

        v_result: list = []
        v_return_values: dict = {}
        try:
            # … SP logic …
            v_result.append({"rows_updated": result.rows_updated})
            v_return_values["continue"] = rows_updated > 0
            return {"continue": 1, "return_values": v_return_values}
        except Exception:
            status = "error"
            raise
        finally:
            sp_trace(v_result, v_return_values, status=status)
    """
    from snowflake import telemetry

    for i, step in enumerate(v_result):
        for k, v in to_telemetry_attrs(step).items():
            telemetry.set_span_attribute(f"step_{i}_{k}", v)
    if v_return_values:
        for k, v in to_telemetry_attrs(v_return_values).items():
            telemetry.set_span_attribute(f"return_{k}", v)
    telemetry.add_event("sp_end", {"status": status})

stage_list(session, stage, pattern=None)

List files in a Snowflake stage and return a structured DataFrame.

Runs LIST @{stage} and enriches the result with computed path and URL columns. LAST_MODIFIED is parsed to TIMESTAMP_LTZ.

Extra columns added beyond the raw LIST output:

  • STAGE_TYPE"INTERNAL" | "EXTERNAL" | "GIT"
  • STAGE_PREFIX — prefix present at the start of NAME; strip it to get the relative path. Internal: "{stage}/"; external/git: URL prefix.
  • RELATIVE_PATH — path within the stage, without the prefix.
  • STAGE_FILE_URL — stable @stage/relative_path URL (from fl_get_stage_file_url). Use to_file(col("STAGE_FILE_URL")) to apply other fl_* functions (fl_get_scoped_file_url, fl_get_content_type, fl_is_image, …) in a subsequent select.

Parameters:

Name Type Description Default
session 'snowpark.Session'

Active Snowpark session.

required
stage str

Stage name — with or without @, qualified or not (e.g. "MY_STAGE" or "DB.SCHEMA.MY_STAGE").

required
pattern str | None

Optional regex forwarded to LIST … PATTERN (e.g. ".*\\.csv").

None

Returns:

Type Description
'snowpark.DataFrame'

DataFrame with columns NAME, SIZE, MD5, LAST_MODIFIED

'snowpark.DataFrame'

(TIMESTAMP_LTZ), SHA1 (populated for GIT stages, NULL otherwise),

'snowpark.DataFrame'

STAGE_TYPE, STAGE_PREFIX, RELATIVE_PATH, STAGE_FILE_URL.

Source code in src/pinky_snowpark/snowpark.py
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
def stage_list(
    session: "snowpark.Session",
    stage: str,
    pattern: str | None = None,
) -> "snowpark.DataFrame":
    """List files in a Snowflake stage and return a structured DataFrame.

    Runs ``LIST @{stage}`` and enriches the result with computed path and URL
    columns. ``LAST_MODIFIED`` is parsed to ``TIMESTAMP_LTZ``.

    Extra columns added beyond the raw LIST output:

    - ``STAGE_TYPE``     — ``"INTERNAL"`` | ``"EXTERNAL"`` | ``"GIT"``
    - ``STAGE_PREFIX``   — prefix present at the start of ``NAME``; strip it to
      get the relative path. Internal: ``"{stage}/"``; external/git: URL prefix.
    - ``RELATIVE_PATH``  — path within the stage, without the prefix.
    - ``STAGE_FILE_URL`` — stable ``@stage/relative_path`` URL (from
      ``fl_get_stage_file_url``). Use ``to_file(col("STAGE_FILE_URL"))`` to
      apply other ``fl_*`` functions (``fl_get_scoped_file_url``,
      ``fl_get_content_type``, ``fl_is_image``, …) in a subsequent select.

    Args:
        session: Active Snowpark session.
        stage:   Stage name — with or without ``@``, qualified or not
                 (e.g. ``"MY_STAGE"`` or ``"DB.SCHEMA.MY_STAGE"``).
        pattern: Optional regex forwarded to ``LIST … PATTERN`` (e.g. ``".*\\\\.csv"``).

    Returns:
        DataFrame with columns ``NAME``, ``SIZE``, ``MD5``, ``LAST_MODIFIED``
        (``TIMESTAMP_LTZ``), ``SHA1`` (populated for GIT stages, ``NULL`` otherwise),
        ``STAGE_TYPE``, ``STAGE_PREFIX``, ``RELATIVE_PATH``, ``STAGE_FILE_URL``.
    """
    from snowflake.core import Root
    from snowflake.snowpark.functions import (
        col,
        concat,
        fl_get_stage_file_url,
        lit,
        substring,
        to_file,
        to_timestamp_ltz,
        to_timestamp_tz,
    )

    stage_clean = stage.lstrip("@")

    db = str(session.get_current_database()).replace('"', '')
    schema = str(session.get_current_schema()).replace('"', '')
    root = Root(session)
    parts = stage_clean.split(".")
    sname = parts[-1]
    db_ref = db if len(parts) < 3 else parts[0]
    schema_ref = schema if len(parts) < 2 else (parts[-2] if len(parts) == 2 else parts[1])

    stage_obj = root.databases[db_ref].schemas[schema_ref].stages[sname].fetch()
    url = (stage_obj.url or "").rstrip("/")

    if not url:
        stage_type = "INTERNAL"
        stage_prefix = sname.lower() + "/"
    elif url.startswith("https://"):
        stage_type = "GIT"
        stage_prefix = url + "/"
    else:
        stage_type = "EXTERNAL"
        stage_prefix = url + "/"

    stage_files = session.file.list(f"@{stage_clean}", pattern=pattern)
    if not stage_files:
        from snowflake.snowpark.types import (
            LongType,
            StringType,
            StructField,
            StructType,
            TimestampType,
        )
        return session.create_dataframe(
            [],
            schema=StructType([
                StructField("NAME", StringType()),
                StructField("SIZE", LongType()),
                StructField("MD5", StringType()),
                StructField("SHA1", StringType()),
                StructField("LAST_MODIFIED", TimestampType()),
                StructField("STAGE_TYPE", StringType()),
                StructField("STAGE_PREFIX", StringType()),
                StructField("RELATIVE_PATH", StringType()),
                StructField("STAGE_FILE_URL", StringType()),
            ]),
        )

    relative_path_col = substring(col("NAME"), lit(len(stage_prefix) + 1))
    file_col = to_file(concat(lit(f"@{stage_clean}/"), relative_path_col))

    return (
        session.create_dataframe(stage_files)
        .with_column(
            "LAST_MODIFIED",
            to_timestamp_ltz(
                to_timestamp_tz(
                    concat(col("LAST_MODIFIED"), lit(" +0000")),
                    "DY, DD MON YYYY HH24:MI:SS GMT TZHTZM",
                )
            ),
            keep_column_order=True,
        )
        .with_column("STAGE_TYPE", lit(stage_type))
        .with_column("STAGE_PREFIX", lit(stage_prefix))
        .with_column("RELATIVE_PATH", relative_path_col)
        .with_column("STAGE_FILE_URL", fl_get_stage_file_url(file_col))
    )

stage_scoped_file_url(session, stage, file_path)

Return a scoped URL for a file inside a Snowflake stage.

Calls BUILD_SCOPED_FILE_URL which has no Python API equivalent. Use this helper instead of calling session.sql() directly.

Parameters:

Name Type Description Default
session 'snowpark.Session'

Active Snowpark session.

required
stage str

Stage name (e.g. "@MY_STAGE").

required
file_path str

Relative path of the file within the stage.

required

Returns:

Type Description
str

Scoped URL string suitable for SnowflakeFile.open().

Source code in src/pinky_snowpark/snowpark.py
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
def stage_scoped_file_url(session: "snowpark.Session", stage: str, file_path: str) -> str:
    """Return a scoped URL for a file inside a Snowflake stage.

    Calls ``BUILD_SCOPED_FILE_URL`` which has no Python API equivalent.
    Use this helper instead of calling ``session.sql()`` directly.

    Args:
        session:   Active Snowpark session.
        stage:     Stage name (e.g. ``"@MY_STAGE"``).
        file_path: Relative path of the file within the stage.

    Returns:
        Scoped URL string suitable for ``SnowflakeFile.open()``.
    """
    return str(
        session.sql(  # noqa: SNOWPARK-API — BUILD_SCOPED_FILE_URL has no Python API equivalent
            f"SELECT BUILD_SCOPED_FILE_URL('{stage}', '{file_path}') AS FILE_URL"
        ).collect()[0][0]
    )

stage_write(session, data, stage, folder, file_name, encoding='utf-8', overwrite=True, file_format=None, pgp_key=None, keep_plain_stage=None, registry_table=None)

Write a file to a Snowflake stage from memory.

Supports DataFrames (CSV, JSON, Parquet, Avro), openpyxl Workbooks, raw strings, and raw bytes. Optional PGP encryption is applied before upload.

DataFrame serialization is controlled by file_format:

  • CsvFormat — collected locally, written with preset dialect options, then PUT.
  • JsonFormat — collected locally, written as NDJSON, then PUT. PGP-compatible.
  • ParquetFormat — written directly to stage via DataFrameWriter. PGP not supported.
  • AvroFormat — written directly to stage via DataFrameWriter. PGP not supported.

file_format is ignored for non-DataFrame data (str, bytes, Workbook) — those are already serialized; only pgp_key applies.

The registry feature is opt-in: pass registry_table to append file metadata (path, MD5, content hash, size, last modified) to an existing Snowflake table after each upload. Useful for transfer monitoring and embedded regression testing (content hash comparison across runs).

Parameters:

Name Type Description Default
session 'snowpark.Session'

Active Snowpark session.

required
data 'snowpark.DataFrame | Workbook | str | bytes'

Data to write — Snowpark DataFrame, openpyxl Workbook, str, or bytes. Use str / bytes for any format the caller serializes itself (XML, fixed-width, custom text…). Example for XML::

              ET.tostring(root, encoding="unicode")         # → str
              ET.tostring(root, encoding="utf-8",
                          xml_declaration=True)             # → bytes
required
stage str

Destination stage name (e.g. "MY_STAGE" or "@DB.SCHEMA.MY_STAGE").

required
folder str

Sub-folder path within the stage (e.g. "exports/jan").

required
file_name str

Target file name (e.g. "report.csv").

required
encoding str

Text encoding (default "utf-8").

'utf-8'
overwrite bool

Overwrite existing file (default True).

True
file_format 'StageFileFormat | None'

Serialization config for DataFrame data. None defaults to CsvFormat() (comma-separated, no header). Use module-level presets (CSV_PIPE, CSV_TAB, …) or build a custom CsvFormat directly. Ignored for non-DataFrame data.

None
pgp_key 'pgpy.PGPKey | None'

Public PGP key for encryption (None = no encryption). Not compatible with ParquetFormat / AvroFormat.

None
keep_plain_stage str | None

Stage name to keep an unencrypted copy (non-PROD only, ignored when pgp_key is None).

None
registry_table str | None

Fully-qualified table name to append file metadata to (e.g. "MY_DB.MY_SCHEMA.MY_STAGE_REGISTRY"). None = no registry (default).

None

Returns:

Type Description
str

Final file name written to the stage (with .asc suffix if PGP-encrypted).

Raises:

Type Description
ValueError

If the format is unsupported, or PGP is requested with ParquetFormat / AvroFormat.

TypeError

If data is not a supported type.

Source code in src/pinky_snowpark/snowpark.py
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
def stage_write(
    session: "snowpark.Session",
    data: "snowpark.DataFrame | Workbook | str | bytes",
    stage: str,
    folder: str,
    file_name: str,
    encoding: str = "utf-8",
    overwrite: bool = True,
    file_format: "StageFileFormat | None" = None,
    pgp_key: "pgpy.PGPKey | None" = None,
    keep_plain_stage: str | None = None,
    registry_table: str | None = None,
) -> str:
    """Write a file to a Snowflake stage from memory.

    Supports DataFrames (CSV, JSON, Parquet, Avro), openpyxl Workbooks, raw
    strings, and raw bytes. Optional PGP encryption is applied before upload.

    DataFrame serialization is controlled by ``file_format``:

    - ``CsvFormat``     — collected locally, written with preset dialect options, then PUT.
    - ``JsonFormat``    — collected locally, written as NDJSON, then PUT. PGP-compatible.
    - ``ParquetFormat`` — written directly to stage via ``DataFrameWriter``. PGP not supported.
    - ``AvroFormat``    — written directly to stage via ``DataFrameWriter``. PGP not supported.

    ``file_format`` is ignored for non-DataFrame ``data`` (``str``, ``bytes``,
    ``Workbook``) — those are already serialized; only ``pgp_key`` applies.

    The registry feature is opt-in: pass ``registry_table`` to append file
    metadata (path, MD5, content hash, size, last modified) to an existing
    Snowflake table after each upload. Useful for transfer monitoring and
    embedded regression testing (content hash comparison across runs).

    Args:
        session:          Active Snowpark session.
        data:             Data to write — Snowpark DataFrame, openpyxl Workbook,
                          ``str``, or ``bytes``. Use ``str`` / ``bytes`` for any
                          format the caller serializes itself (XML, fixed-width,
                          custom text…). Example for XML::

                              ET.tostring(root, encoding="unicode")         # → str
                              ET.tostring(root, encoding="utf-8",
                                          xml_declaration=True)             # → bytes
        stage:            Destination stage name (e.g. ``"MY_STAGE"`` or
                          ``"@DB.SCHEMA.MY_STAGE"``).
        folder:           Sub-folder path within the stage (e.g. ``"exports/jan"``).
        file_name:        Target file name (e.g. ``"report.csv"``).
        encoding:         Text encoding (default ``"utf-8"``).
        overwrite:        Overwrite existing file (default ``True``).
        file_format:      Serialization config for DataFrame data. ``None`` defaults
                          to ``CsvFormat()`` (comma-separated, no header).
                          Use module-level presets (``CSV_PIPE``, ``CSV_TAB``, …)
                          or build a custom ``CsvFormat`` directly.
                          Ignored for non-DataFrame ``data``.
        pgp_key:          Public PGP key for encryption (``None`` = no encryption).
                          Not compatible with ``ParquetFormat`` / ``AvroFormat``.
        keep_plain_stage: Stage name to keep an unencrypted copy (non-PROD only,
                          ignored when ``pgp_key`` is ``None``).
        registry_table:   Fully-qualified table name to append file metadata to
                          (e.g. ``"MY_DB.MY_SCHEMA.MY_STAGE_REGISTRY"``).
                          ``None`` = no registry (default).

    Returns:
        Final file name written to the stage (with ``.asc`` suffix if PGP-encrypted).

    Raises:
        ValueError: If the format is unsupported, or PGP is requested with
                    ``ParquetFormat`` / ``AvroFormat``.
        TypeError:  If ``data`` is not a supported type.
    """
    import snowflake.snowpark as _snowpark

    if file_format is None:
        file_format = CsvFormat()

    folder = folder.strip("/")
    local_path = f"/tmp/{file_name}"
    environment = str(session.get_current_database()).replace('"', "").split("_")[-1]
    content_hash_override: str | None = None
    content_hash_sorted_override: str | None = None

    if isinstance(data, _snowpark.DataFrame):
        if isinstance(file_format, (ParquetFormat, AvroFormat)):
            if pgp_key:
                raise ValueError(f"PGP encryption is not supported for {type(file_format).__name__}")
            fmt_name = "parquet" if isinstance(file_format, ParquetFormat) else "avro"
            dw_mode = "overwrite" if overwrite else "errorifexists"
            data.write.format(fmt_name).mode(dw_mode).save(f"@{stage}/{folder}/{file_name}")
            return file_name
        elif isinstance(file_format, JsonFormat):
            rows = data.collect()
            ndjson = "\n".join(json.dumps(row.as_dict(recursive=True), default=str) for row in rows)
            content_hash_override = hashlib.sha256(ndjson.encode(encoding)).hexdigest()
            content_hash_sorted_override = hashlib.sha256(
                "\n".join(sorted(ndjson.splitlines())).encode(encoding)
            ).hexdigest()
            with open(local_path, "w", encoding=encoding) as f:
                f.write(ndjson)
        elif isinstance(file_format, CsvFormat):
            rows = data.collect()
            if file_format.positional:
                with open(local_path, "w", encoding=encoding, newline="") as f:
                    for row in rows:
                        f.write(str(row[0]) + file_format.lineterminator)
            else:
                col_names = [f.name.strip('"') for f in data.schema.fields]
                if rows:
                    sorted_buf = StringIO()
                    w_sorted = csv.writer(
                        sorted_buf,
                        delimiter=file_format.delimiter,
                        lineterminator="\n",
                        quoting=cast("csv._QuotingType", file_format.quoting),
                    )
                    for row in sorted(rows, key=lambda r: str(list(r))):
                        w_sorted.writerow(list(row))
                    content_hash_sorted_override = hashlib.sha256(sorted_buf.getvalue().encode(encoding)).hexdigest()
                with open(local_path, "w", encoding=encoding, newline="") as f:
                    writer = csv.writer(
                        f,
                        delimiter=file_format.delimiter,
                        lineterminator=file_format.lineterminator,
                        quoting=cast("csv._QuotingType", file_format.quoting),
                    )
                    if file_format.header and rows:
                        writer.writerow(col_names)
                    for row in rows:
                        writer.writerow(list(row))
        else:
            raise ValueError(f"Unsupported file_format type: {type(file_format)}")
    elif isinstance(data, bytes):
        with open(local_path, "wb") as f:
            f.write(data)
    elif isinstance(data, str):
        with open(local_path, "w", encoding=encoding) as f:
            f.write(data)
    else:
        from openpyxl import Workbook as _Workbook
        if not isinstance(data, _Workbook):
            raise TypeError(f"Unsupported data type: {type(data)}")
        cell_rows = [
            tuple(str(cell.value) if cell.value is not None else "" for cell in row)
            for ws in data.worksheets
            for row in ws.iter_rows(min_row=2)
        ]
        content_hash_override = hashlib.sha256(
            "\n".join(",".join(r) for r in cell_rows).encode("utf-8")
        ).hexdigest()
        content_hash_sorted_override = hashlib.sha256(
            "\n".join(",".join(r) for r in sorted(cell_rows)).encode("utf-8")
        ).hexdigest()
        data.save(local_path)

    with open(local_path, "rb") as f:
        plain_bytes = f.read()
    content_hash = content_hash_override if content_hash_override is not None else hashlib.sha256(plain_bytes).hexdigest()
    content_hash_sorted = content_hash_sorted_override if content_hash_sorted_override is not None else content_hash

    if pgp_key:
        if keep_plain_stage and environment != "PRODUCTION":
            session.file.put(
                local_file_name=f"file://{local_path}",
                stage_location=f"@{keep_plain_stage}",
                auto_compress=False,
                overwrite=overwrite,
            )
        with open(local_path, "rb") as f:
            raw = f.read()
        import pgpy
        if pgp_key.pubkey is None:
            raise ValueError(f"PGP key {pgp_key.fingerprint} has no public key component")
        encrypted = pgp_key.pubkey.encrypt(pgpy.PGPMessage.new(raw))
        local_path = f"{local_path}.asc"
        file_name = f"{file_name}.asc"
        with open(local_path, "wb") as f:
            f.write(bytes(encrypted))

    session.file.put(
        local_file_name=f"file://{local_path}",
        stage_location=f"@{stage}/{folder}",
        auto_compress=False,
        overwrite=overwrite,
    )

    if registry_table:
        try:
            _registry_append(
                session=session,
                stage=stage,
                folder=folder,
                file_name=file_name,
                content_hash=content_hash,
                content_hash_sorted=content_hash_sorted,
                table_name=registry_table,
            )
        except Exception:
            logger.warning("Registry append failed for %s/%s", stage, file_name, exc_info=True)

    return file_name

to_telemetry_attrs(data)

Normalise a dict or list for snowflake.telemetry.add_event.

Telemetry only accepts str | int | float | bool | bytes | None. The rules applied are:

  • bool / int / float / bytes → passed through unchanged.
  • None"" (empty string).
  • list[dict] with ≤ 5 items → expanded as {k}_item_0 … {k}_item_n (each JSON-serialised, truncated to 500 chars).
  • list with > 5 items, or list of non-dict → {k} = count (int), {k}_sample = str of the first 3 elements.
  • dict → JSON-serialised, truncated to 500 chars.
  • anything else → str().

If data is a list, it is first converted to {"item_0": …, "item_n": …}.

Parameters:

Name Type Description Default
data dict | list

Dict or list whose values may be of any type.

required

Returns:

Type Description
dict

Dict whose values are all telemetry-compatible.

Examples:

>>> to_telemetry_attrs({"files": [{"NAME": "a.csv"}], "count": 1})
{'files_item_0': '{"NAME": "a.csv"}', 'count': 1}
>>> to_telemetry_attrs([{"NAME": "a.csv"}])
{'item_0': '{"NAME": "a.csv"}'}
Source code in src/pinky_snowpark/snowpark.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
def to_telemetry_attrs(data: dict | list) -> dict:
    """Normalise a dict or list for ``snowflake.telemetry.add_event``.

    Telemetry only accepts ``str | int | float | bool | bytes | None``.
    The rules applied are:

    - ``bool / int / float / bytes`` → passed through unchanged.
    - ``None`` → ``""`` (empty string).
    - ``list[dict]`` with ≤ 5 items → expanded as ``{k}_item_0 … {k}_item_n``
      (each JSON-serialised, truncated to 500 chars).
    - ``list`` with > 5 items, or ``list`` of non-dict → ``{k}`` = count (int),
      ``{k}_sample`` = str of the first 3 elements.
    - ``dict`` → JSON-serialised, truncated to 500 chars.
    - anything else → ``str()``.

    If ``data`` is a list, it is first converted to ``{"item_0": …, "item_n": …}``.

    Args:
        data: Dict or list whose values may be of any type.

    Returns:
        Dict whose values are all telemetry-compatible.

    Examples:
        >>> to_telemetry_attrs({"files": [{"NAME": "a.csv"}], "count": 1})
        {'files_item_0': '{"NAME": "a.csv"}', 'count': 1}

        >>> to_telemetry_attrs([{"NAME": "a.csv"}])
        {'item_0': '{"NAME": "a.csv"}'}
    """
    if isinstance(data, list):
        data = {f"item_{i}": v for i, v in enumerate(data)}

    result: dict[str, Any] = {}
    for k, v in data.items():
        if isinstance(v, bool | int | float | bytes):
            result[k] = v
        elif v is None:
            result[k] = ""
        elif isinstance(v, list):
            if v and isinstance(v[0], dict) and len(v) <= _TELEMETRY_EXPAND_THRESHOLD:
                for i, item in enumerate(v):
                    s = json.dumps(item, ensure_ascii=False, default=str)
                    result[f"{k}_item_{i}"] = s[:500] + "…" if len(s) > 500 else s
            else:
                result[k] = len(v)
                result[f"{k}_sample"] = str(v[:3])
        elif isinstance(v, dict):
            s = json.dumps(v, ensure_ascii=False, default=str)
            result[k] = s[:500] + "…" if len(s) > 500 else s
        else:
            result[k] = str(v)
    return result

validate_sql(session, statement)

Dry-run a SQL statement via EXPLAIN without executing it.

Validates syntax and object references (tables, columns, UDFs) without producing any side effects. Works for SELECT, INSERT, UPDATE, DELETE, MERGE, and CREATE TABLE AS SELECT.

Raises the underlying SnowparkSQLException on invalid SQL so the caller can catch it and surface a clean error message.

Parameters:

Name Type Description Default
session 'snowpark.Session'

Active Snowpark session.

required
statement str

SQL statement to validate (without trailing ;).

required

Returns:

Type Description
bool

True if the statement is valid.

Raises:

Type Description
SnowparkSQLException

If the statement is syntactically invalid or references unknown objects.

Source code in src/pinky_snowpark/snowpark.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def validate_sql(session: "snowpark.Session", statement: str) -> bool:
    """Dry-run a SQL statement via ``EXPLAIN`` without executing it.

    Validates syntax and object references (tables, columns, UDFs) without
    producing any side effects. Works for SELECT, INSERT, UPDATE, DELETE,
    MERGE, and CREATE TABLE AS SELECT.

    Raises the underlying ``SnowparkSQLException`` on invalid SQL so the
    caller can catch it and surface a clean error message.

    Args:
        session:   Active Snowpark session.
        statement: SQL statement to validate (without trailing ``;``).

    Returns:
        ``True`` if the statement is valid.

    Raises:
        snowflake.snowpark.exceptions.SnowparkSQLException: If the statement
            is syntactically invalid or references unknown objects.
    """
    session.sql(f"EXPLAIN {statement}").collect()  # noqa: SNOWPARK-API — no Core API equivalent for EXPLAIN
    return True

with_role(session, role)

Context manager that temporarily switches the session to role.

Restores the original role on exit, even if an exception is raised. Useful for testing permission boundaries or executing a block as a specific role without creating a separate session.

Parameters:

Name Type Description Default
session 'snowpark.Session'

Active Snowpark session.

required
role str

Role name to switch to (must be granted to the current user).

required

Yields:

Type Description
'Generator[snowpark.Session, None, None]'

The same session object, now running as role.

Raises:

Type Description
ValueError

If role is not a valid Snowflake identifier.

SnowparkSQLException

If the role does not exist or is not granted to the current user.

Example::

with with_role(session, "DATA_ANALYST") as s:
    df = s.sql("SELECT * FROM sensitive_view").collect()
# back to original role here
Source code in src/pinky_snowpark/snowpark.py
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
@contextmanager
def with_role(
    session: "snowpark.Session",
    role: str,
) -> "Generator[snowpark.Session, None, None]":
    """Context manager that temporarily switches the session to *role*.

    Restores the original role on exit, even if an exception is raised.
    Useful for testing permission boundaries or executing a block as a
    specific role without creating a separate session.

    Args:
        session: Active Snowpark session.
        role:    Role name to switch to (must be granted to the current user).

    Yields:
        The same *session* object, now running as *role*.

    Raises:
        ValueError:  If *role* is not a valid Snowflake identifier.
        snowflake.snowpark.exceptions.SnowparkSQLException: If the role does
            not exist or is not granted to the current user.

    Example::

        with with_role(session, "DATA_ANALYST") as s:
            df = s.sql("SELECT * FROM sensitive_view").collect()
        # back to original role here
    """
    from pinky_core.security import validate_identifier

    validate_identifier(role)
    original = str(session.get_current_role()).strip('"')
    try:
        session.use_role(role)
        yield session
    finally:
        session.use_role(original)