# Sink results

Sinking results from Materialize to external systems.



A [sink](/concepts/sinks/) describes the external system you want Materialize to
write data to and details the encoding of that data. You can sink data from a
**materialized** view, a source, or a table.

## Sink methods

To create a sink, you can:


| Method | External system | Guide(s) or Example(s) |
| --- | --- | --- |
| Use <code>COPY TO</code> command | Amazon S3 or S3-compatible storage | <ul> <li><a href="/materialize/35604/serve-results/sink/s3/" >Sink to Amazon S3</a></li> </ul>  |
| Use Census as an intermediate step | Census supported destinations | <ul> <li><a href="/materialize/35604/serve-results/sink/census/" >Sink to Census</a></li> </ul>  |
| Use <code>COPY TO</code> S3 or S3-compatible storage as an intermediate step | Snowflake and other systems that can read from S3 | <ul> <li><a href="/materialize/35604/serve-results/sink/snowflake/" >Sink to Snowflake</a></li> </ul>  |
| Use a native connector | Kafka/Redpanda | <ul> <li><a href="/materialize/35604/serve-results/sink/kafka/" >Sink to Kafka/Redpanda</a></li> </ul>  |
| Use a native connector | Apache Iceberg hosted on AWS S3 Tables | <ul> <li><a href="/materialize/35604/serve-results/sink/iceberg/" >Sink to Iceberg</a></li> </ul>  |
| Use <code>SUBSCRIBE</code> | Various | <ul> <li><a href="https://github.com/MaterializeInc/mz-catalog-sync" >Sink to Postgres</a></li> <li><a href="https://github.com/MaterializeIncLabs/mz-redis-sync" >Sink to Redis</a></li> </ul>  |


### Operational guideline

- Avoid putting sinks on the same cluster that hosts sources to allow for
[blue/green deployment](/manage/dbt/blue-green-deployments).

### Troubleshooting

For help, see [Troubleshooting
sinks](/serve-results/sink/sink-troubleshooting/).



---

## Amazon S3


This guide walks you through the steps required to export results from
Materialize to Amazon S3. Copying results to S3 is
useful to perform tasks like periodic backups for auditing, or downstream
processing in analytical data warehouses like [Snowflake](/serve-results/snowflake/),
Databricks or BigQuery.

## Before you begin

- Ensure you have access to an AWS account, and permissions to create and manage
  IAM policies and roles. If you're not an account administrator, you will need
  support from one!

## Step 1. Set up an Amazon S3 bucket

First, you must set up an S3 bucket and give Materialize enough permissions to
write files to it. We **strongly** recommend using [role assumption-based authentication](/sql/create-connection/#aws-permissions)
to manage access to the target bucket.

### Create a bucket

1. Log in to your AWS account.

1. Navigate to **AWS Services**, then **S3**.

1. Create a new, general purpose S3 bucket with the suggested default
   configurations.

### Create an IAM policy

Once you create an S3 bucket, you must associate it with an [IAM policy](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies.html)
that specifies what actions can be performed on the bucket by the Materialize
exporter role. For Materialize to be able to write data into the bucket, the
IAM policy must allow the following actions:

Action type  | Action name                                                                            | Action description
-------------|----------------------------------------------------------------------------------------|---------------
Write        | [`s3:PutObject`](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html)      | Grants permission to add an object to a bucket.
List         | [`s3:ListBucket`](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html) | Grants permission to list some or all of the objects in a bucket.
Write        | [`s3:DeleteObject`](https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html)| Grants permission to remove an object from a bucket.

To create a new IAM policy:

1. Navigate to **AWS Services**, then **AWS IAM**.

1. In the **IAM Dashboard**, click **Policies**, then **Create policy**.

1. For **Policy editor**, choose **JSON**.

1. Copy and paste the policy below into the editor, replacing `<bucket>` with
   the bucket name and `<prefix>` with the folder path prefix.

   ```json
   {
       "Version": "2012-10-17",
       "Statement": [
           {
               "Effect": "Allow",
               "Action": [
                 "s3:PutObject",
                 "s3:DeleteObject"
               ],
               "Resource": "arn:aws:s3:::<bucket>/<prefix>/*"
           },
           {
               "Effect": "Allow",
               "Action": [
                   "s3:ListBucket"
               ],
               "Resource": "arn:aws:s3:::<bucket>",
               "Condition": {
                   "StringLike": {
                       "s3:prefix": [
                           "<prefix>/*"
                       ]
                   }
               }
           }
       ]
   }
   ```

1. Click **Next**.

1. Enter a name for the policy, and click **Create policy**.

### Create an IAM role

Next, you must attach the policy you just created to a Materialize-specific
[IAM role](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles.html).

1. Navigate to **AWS Services**, then **AWS IAM**.

1. In the **IAM Dashboard**, click **Roles**, then **Create role**.

1. In **Trusted entity type**, select **Custom trust policy**, and copy and
   paste the policy below.

   ```json
   {
       "Version": "2012-10-17",
       "Statement": [
           {
               "Effect": "Allow",
               "Principal": {
                   "AWS": "arn:aws:iam::664411391173:role/MaterializeConnection"
               },
               "Action": "sts:AssumeRole",
               "Condition": {
                   "StringEquals": {
                       "sts:ExternalId": "PENDING"
                   }
               }
           }
       ]
   }
   ```

   Materialize **always uses the provided IAM principal** to assume the role, and
   also generates an **external ID** which uniquely identifies your AWS connection
   across all Materialize regions (see [AWS connection permissions](/sql/create-connection/#aws-permissions)).
   For now, you'll set this ID to a dummy value; later, you'll update it with
   the unique identifier for your Materialize region.

1. Click **Next**.

1. In **Add permissions**, select the IAM policy you created in [Create an IAM policy](#create-an-iam-policy),
   and click **Next**.

1. Enter a name for the role, and click **Create role**.

1. Click **View role** to see the role summary page, and note down the
   role **ARN**. You will need it in the next step to create an AWS connection in
   Materialize.

## Step 2. Create a connection

1. In the [SQL Shell](/console/), or your preferred SQL
   client connected to Materialize, create an [AWS connection](/sql/create-connection/#aws),
   replacing `<account-id>` with the 12-digit number that identifies your
   AWS account, and  `<role>` with the name of the role you created in the
   previous step:

   ```mzsql
   CREATE CONNECTION aws_connection
      TO AWS (ASSUME ROLE ARN = 'arn:aws:iam::<account-id>:role/<role>');
   ```

1. Retrieve the external ID for the connection:

   ```mzsql
   SELECT awsc.id, external_id
    FROM mz_internal.mz_aws_connections awsc
    JOIN mz_connections c ON awsc.id = c.id
    WHERE c.name = 'aws_connection';
   ```

   The external ID will have the following format:

   ```text
   mz_XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX_uXXX
   ```

1. In your AWS account, find the IAM role you created in [Create an IAM role](#create-an-iam-role)
   and, under **Trust relationships**, click **Edit trust policy**. Use the
   `external_id` from the previous step to update the trust policy's
   `sts:ExternalId`, then click **Update policy**.

   > **Warning:** Failing to constrain the external ID in your role trust policy
>    will allow other Materialize customers to assume your role and use AWS
>    privileges you have granted the role!


1. Back in Materialize, validate the AWS connection you created using the
   [`VALIDATE CONNECTION`](/sql/validate-connection) command.

   ```mzsql
   VALIDATE CONNECTION aws_connection;
   ```

   If no validation error is returned, you're ready to use this connection to
   run a bulk export from Materialize to your target S3 bucket! 🔥

## Step 3. Run a bulk export

To export data to your target S3 bucket, use the [`COPY TO`](/sql/copy-to/#copy-to-s3)
command, and the AWS connection you created in the previous step.


**Parquet:**

```mzsql
COPY some_object TO 's3://<bucket>/<path>'
WITH (
    AWS CONNECTION = aws_connection,
    FORMAT = 'parquet'
  );
```

For details on the Parquet writer settings Materialize uses, as well as data
type support and conversion, check the [reference documentation](/sql/copy-to/#copy-to-s3-parquet).



**CSV:**

```mzsql
COPY some_object TO 's3://<bucket>/<path>'
WITH (
    AWS CONNECTION = aws_connection,
    FORMAT = 'csv'
  );
```





You might notice that Materialize first writes a sentinel file to the target S3
bucket. When the copy operation is complete, this file is deleted. This allows
using the [`s3:ObjectRemoved` event](https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-how-to-event-types-and-destinations.html#supported-notification-event-types)
to trigger downstream processing.

## Step 4. (Optional) Add scheduling

Bulk exports to Amazon S3 using the `COPY TO` command are _one-shot_: every time
you want to export results, you must run the command. To automate running bulk
exports on a regular basis, you can set up scheduling, for example using a
simple `cron`-like service or an orchestration platform like Airflow or
Dagster.


---

## Apache Iceberg


> **Public Preview:** This feature is in public preview.


Iceberg sinks provide exactly once delivery of updates from Materialize into [Apache
Iceberg](https://iceberg.apache.org/)[^1] tables hosted on [Amazon S3
Tables](https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables.html)[^2].
As data changes in Materialize, the corresponding Iceberg tables are
automatically kept up to date. You can sink data from a materialized view, a
source, or a table.

This guide walks you through the steps required to set up Iceberg sinks in
Materialize Cloud.

[^1]: [Apache Iceberg](https://iceberg.apache.org/) is an open table format for
large-scale analytics datasets.

[^2]: [Amazon S3
Tables](https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables.html) is
    an AWS feature that provides fully managed Apache Iceberg tables as a native
    S3 storage type.

## Prerequisites

- An AWS account with permissions to create and manage IAM policies and roles.
- An AWS S3 Table bucket in your AWS account. The S3 Table bucket must be in
  the same AWS region as your Materialize deployment.
- A namespace in the AWS S3 Table bucket. For details on creating namespaces,
  see [AWS S3 documentation: Creating a namespace](https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables-namespace-create.html).

## Step 1. Set up permissions in AWS

In AWS, set up permissions to allow Materialize to write data files to the
object storage backing your Iceberg catalog. This tutorial uses an IAM policy
and IAM role to grant the required permissions. We **strongly** recommend using
[role assumption-based authentication](/sql/create-connection/#aws-permissions)
to manage access.

### Step 1A. Create an IAM policy

Create an [IAM
policy](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies.html)
that allows full access to your S3 Tables API.Replace `<S3 table bucket ARN>`
with the ARN of your S3 table bucket:

```json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "s3tables:*",
            "Resource": [
                "<S3 table bucket ARN>"
                "<S3 table bucket ARN>/table/*"
            ]
        }
    ]
}
```

### Step 1B. Create an IAM role

Create an [IAM
role](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles.html) that
Materialize can assume.

1. For the **Trusted entity type**, specify **Custom trust policy** with the
following:
    - `Principal`: The example uses the [Materialize Cloud IAM
      principal](/sql/create-connection/#aws-permissions). For self-managed
      deployments and the Emulator, the principal will differ.
    - `ExternalId`: `"PENDING"` is a placeholder and will be updated after
    creating the AWS connection in Materialize.

    ```json
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "AWS": "arn:aws:iam::664411391173:role/MaterializeConnection"
                },
                "Action": "sts:AssumeRole",
                "Condition": {
                    "StringEquals": {
                        "sts:ExternalId": "PENDING"
                    }
                }
            }
        ]
    }
    ```

1. For permissions, add the [IAM policy created
   earlier](#step-1a-create-an-iam-policy) to grant access to the S3 Tables.

Once you have created the IAM role, copy the role ARN from the AWS console.
You'll use the ARN in the next step.

## Step 2. Create an AWS connection in Materialize

In Materialize, create an **AWS connection** to authenticate with the object
storage:

1. Use [`CREATE CONNECTION ... TO AWS`](/sql/create-connection/#aws) to create
   an AWS connection, replacing:

   - `<IAM role ARN>` with your IAM role ARN from [step
     1](#step-1b-create-an-iam-role)
   - `<region>` with your AWS region (e.g., `us-east-1`):

    ```mzsql
    CREATE CONNECTION aws_connection TO AWS (
        ASSUME ROLE ARN = '<IAM role ARN>',
        REGION = '<region>'
    );
    ```

    For more details on AWS connection options, see [`CREATE
    CONNECTION`](/sql/create-connection/#aws).

1. Fetch the `external_id` for your connection, replacing `<IAM role ARN>` with
    your IAM role ARN:

   ```mzsql
   SELECT external_id
   FROM mz_internal.mz_aws_connections awsc
   JOIN mz_connections c ON awsc.id = c.id
   WHERE c.name = 'aws_connection'
   AND awsc.assume_role_arn = '<iam-role-arn>';
   ```

   You will use the `external_id` to update the IAM role in the next step.

## Step 3. Update the IAM role in AWS

Once you have the `external_id`, update the trust policy for the IAM role
created in [step 1](#step-1b-create-an-iam-role). Replace `"PENDING"` with your
external ID value. Your IAM trust policy should look like the following (but
with your external ID value):

```json{hl_lines="12"}
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::664411391173:role/MaterializeConnection"
            },
            "Action": "sts:AssumeRole",
            "Condition": {
                "StringEquals": {
                    "sts:ExternalId": "mz_1234abcd-5678-efgh-9012-ijklmnopqrst_u123"
                }
            }
        }
    ]
}
```

## Step 4. Create an Iceberg catalog connection in Materialize

In Materialize, create an **Iceberg catalog connection** for the Iceberg sink to
use. To create, use [`CREATE CONNECTION ... TO ICEBERG
CATALOG`](/sql/create-connection/#iceberg-catalog), replacing:
- `<region>` with your AWS region (e.g., `us-east-1`) and
- `<S3 table bucket ARN>` with your AWS S3 Table bucket ARN.

The command uses the AWS connection you created earlier.

```mzsql
CREATE CONNECTION iceberg_catalog_connection TO ICEBERG CATALOG (
    CATALOG TYPE = 's3tablesrest',
    URL = 'https://s3tables.<region>.amazonaws.com/iceberg',
    WAREHOUSE = '<S3 table bucket ARN>',
    AWS CONNECTION = aws_connection
);
```

## Step 5. Create the Iceberg sink in Materialize

In Materialize, you can sink from a materialized view, table, or source. Use
[`CREATE SINK`](/sql/create-sink/iceberg) to create an Iceberg sink, replacing:

- `<sink_name>` with a name for your sink.
- `<sink_cluster>` with the name of your sink cluster.
- `<my_materialize_object>` with the name of your materialized view, table, or source.
- `<my_s3_table_bucket_namespace>` with your S3 Table bucket namespace.
- `<my_iceberg_table>` with the name of your Iceberg table. If the Iceberg table
  does not exist, Materialize creates the table. For details, see [`CREATE SINK`
  reference page](/sql/create-sink/iceberg/#iceberg-table-creation).
- `<key>` with the column(s) that uniquely identify rows.
- `<commit_interval>` with your commit interval (e.g., `60s`). The commit
  interval specifies how frequently Materialize commits snapshots to Iceberg.
  The minimum commit interval is `1s`. See [Commit interval
  tradeoffs](#commit-interval-tradeoffs) below.

```mzsql
CREATE SINK <sink_name>
  IN CLUSTER <sink_cluster>
  FROM <my_materialize_object>
  INTO ICEBERG CATALOG CONNECTION iceberg_catalog_connection (
    NAMESPACE = '<my_s3_table_bucket_namespace>',
    TABLE = '<my_iceberg_table>'
  )
  USING AWS CONNECTION aws_connection
  KEY (<key>)
  MODE UPSERT
  WITH (COMMIT INTERVAL = '<commit_interval>');
```

For the full list of syntax options, see the [`CREATE SINK` reference](/sql/create-sink/iceberg).

## Considerations

### Commit interval tradeoffs {#commit-interval-tradeoffs}

The `COMMIT INTERVAL` setting controls how frequently Materialize commits
snapshots to your Iceberg table, making the data available to downstream query
engines. This setting involves tradeoffs:

| Shorter intervals (e.g., < `60s`) | Longer intervals (e.g., `5m`) |
|---------------------------------|-------------------------------|
| Lower latency - data visible sooner in downstream systems | Higher latency - data takes longer to appear |
| More small files - can degrade query performance over time | Fewer, larger files - better query performance |
| More frequent snapshot commits - higher catalog overhead | Less catalog overhead |
| Lower throughput efficiency | Higher throughput efficiency |

**Recommendations:**
- For production, use intervals of `60s` or longer
- For batch analytics, use longer intervals (`5m` to `15m`)

> **Note:** Outside of development environments, commit intervals should be at least `60s`.
> Short commit intervals increase catalog overhead and produce many small files.
> Small files will result in degraded query performance. It also increases load on
> the Iceberg metadata, which can result in a degraded catalog, and non-responsive
> queries.


### Exactly-once delivery

<p>Iceberg sinks provide <strong>exactly-once delivery</strong>. After a restart,
Materialize resumes from the last committed snapshot without duplicating
data.</p>
<p>Materialize stores progress information in Iceberg snapshot metadata
properties (<code>mz-frontier</code> and <code>mz-sink-version</code>).</p>


### Type mapping

Materialize converts SQL types to Iceberg/Parquet types:

| SQL type | Iceberg type |
|----------|--------------|
| `boolean` | `boolean` |
| `smallint`, `integer` | `int` |
| `bigint` | `long` |
| `real` | `float` |
| `double precision` | `double` |
| `numeric` | `decimal(38, scale)` |
| `date` | `date` |
| `time` | `time` (microsecond) |
| `timestamp` | `timestamp` (microsecond) |
| `timestamptz` | `timestamptz` (microsecond) |
| `text`, `varchar` | `string` |
| `bytea` | `binary` |
| `uuid` | `fixed(16)` |
| `jsonb` | `string` |
| `record` | `struct` |
| `list` | `list` |
| `map` | `map` |

### Limitations

- Your S3 Tables bucket must be in the same AWS region as your Materialize
deployment.

- Partitioned tables are not supported.

- Schema evolution of an Iceberg table is not supported. If the <code>SINK FROM</code> object&rsquo;s schema changes, you must drop and recreate the sink.

## Troubleshooting

### Sink creation fails with "input compacted past resume upper"

This error occurs when the source data has been compacted beyond the point where
the sink last committed. This can happen after a Materialize backup/restore
operation. You may need to drop and recreate the sink, which will re-snapshot
the entire source relation.

### Commit conflicts

If another process modifies the Iceberg table while Materialize is committing,
you may see commit conflict errors. Materialize will automatically retry, but
if conflicts persist, ensure no other writers are modifying the same table.

## Related pages

- [`CREATE SINK`](/sql/create-sink/iceberg)
- [`CREATE CONNECTION`](/sql/create-connection)
- [Apache Iceberg documentation](https://iceberg.apache.org/docs/latest/)


---

## Census


This guide walks you through the steps required to create a [Census](https://www.getcensus.com/) sync using Materialize.

## Before you begin

In order to build a sync with Census you will need:

* A table, view, materialized view or source within your Materialize account that you would like to export.
* A [Braze](https://www.braze.com/) account. Census supports a number of possible [destinations](https://www.getcensus.com/integrations), we will use Braze as an example.

## Step 1. Set up a Materialize Source

To begin you will need to add your Materialize database as a source in Census.

1. In Census, navigate to **Sources** and then click **New Source**.

1. From the list of connection types, choose **Materialize**.

1. Set the connection parameters using the credentials provided in the [Materialize console](/console/).
   Then click the **Connect** button.

## Step 2. Set up a Destination

Next you will add a destination where data will be sent.


**Braze:**

1. In Census, navigate to **Destinations** and then click **New Destination**.

1. From the list of destinations types, choose **Braze**.

1. You will need to supply your Braze URL (which will most likely be `https://rest.iad-03.braze.com`) and a Braze API key.
   The [Census guide for Braze](https://docs.getcensus.com/destinations/braze) will explain how to create an API key with the
   correct permissions. Then click the **Connect**.




## Step 3. Create a Sync

After successfully adding the Materialize source, you can create a sync to send data from Materialize to your downstream destination.


**Braze:**

1. In Census, navigate to **Syncs** and then click **New Sync**.

1. Under **Select a Source** choose **Select a Warehouse Table**. Using the drop-down, choose the Materialize source that was
   configured in step 1 as the **Connection**. Using the **Schema** and **Table** drop-downs you can select the
   Materialize object you would like to export.

1. Under **Select a Destination** choose the Braze destination configured in step 2 and select "User" as the **Object**.

1. Under **Select Sync Behavior** can be set to "Update or Create". This will only add and modify new data in Braze but never delete users.

1. Under **Select a Sync Key** select an id column from the Materialize object.

1. Under **Set Up Braze Field Mappings** set any of the columns in the Materialize object to their corresponding fields in the Braze User entity.

1. Click **Next** to see an overview of your sync and click **Create** to create the sync.




## Step 4. Add a Schedule (Optional)

Your Census sync is created and ready to run. It can be invoked manually but a schedule will ensure all new data
is sent to the destination.

1. In Census navigate to **Syncs** and select the sync that was just created.

1. Within your sync toolbar click **Configuration**. In **Sync Trigger > Schedule** you can select from a number of
   difference schedules. If you are using a source or materialized view as your source object, you can choose "Continuous"
   and Census will retrieve new data as soon as it exists within Materialize.


---

## Kafka and Redpanda


<!-- Ported over content from sink-kafka.md. -->

## Connectors

Materialize bundles a **native connector** that allow writing data to Kafka and
Redpanda. When a user defines a sink to Kafka/Redpanda, Materialize
automatically generates the required schema and writes down the stream of
changes to that view or source. In effect, Materialize sinks act as change data
capture (CDC) producers for the given source or view.

For details on the connector, including syntax, supported formats and examples,
refer to [`CREATE SINK`](/sql/create-sink/kafka).

> **Tip:** Redpanda uses the same syntax as Kafka [`CREATE SINK`](/sql/create-sink/kafka).


## Features

### Memory use during creation

During creation, sinks need to load an entire snapshot of the data in memory.

### Automatic topic creation

If the specified Kafka topic does not exist, Materialize will attempt to create
it using the broker's default number of partitions, default replication factor,
default compaction policy, and default retention policy, unless any specific
overrides are provided as part of the [connection
options](/sql/create-sink/kafka#syntax).

If the connection's [progress
topic](/sql/create-sink/kafka#exactly-once-processing) does not exist,
Materialize will attempt to create it with a single partition, the broker's
default replication factor, compaction enabled, and both size- and time-based
retention disabled. The replication factor can be overridden using the `PROGRESS
TOPIC REPLICATION FACTOR` option when creating a connection [`CREATE
CONNECTION`](/sql/create-connection).

To customize topic-level configuration, including compaction settings and other
values, use the `TOPIC CONFIG` option in the [connection
options](/sql/create-sink/kafka#syntax) to set any relevant kafka
[topic configs](https://kafka.apache.org/documentation/#topicconfigs).

If you manually create the topic or progress topic in Kafka before
running `CREATE SINK`, observe the following guidance:

| Topic          | Configuration       | Guidance
|----------------|---------------------|---------
| Data topic     | Partition count     | Your choice, based on your performance and ordering requirements.
| Data topic     | Replication factor  | Your choice, based on your durability requirements.
| Data topic     | Compaction          | Your choice, based on your downstream applications' requirements. If using the [Upsert envelope](/sql/create-sink/kafka#upsert), enabling compaction is typically the right choice.
| Data topic     | Retention           | Your choice, based on your downstream applications' requirements.
| Progress topic | Partition count     | **Must be set to 1.** Using multiple partitions can cause Materialize to violate its [exactly-once guarantees](/sql/create-sink/kafka#exactly-once-processing).
| Progress topic | Replication factor  | Your choice, based on your durability requirements.
| Progress topic | Compaction          | We recommend enabling compaction to avoid accumulating unbounded state. Disabling compaction may cause performance issues, but will not cause correctness issues.
| Progress topic | Retention           | **Must be disabled.** Enabling retention can cause Materialize to violate its [exactly-once guarantees](/sql/create-sink/kafka#exactly-once-processing).
| Progress topic | Tiered storage      | We recommend disabling tiered storage to allow for more aggressive data compaction. Fully compacted data requires minimal storage, typically only tens of bytes per sink, making it cost-effective to maintain directly on local disk.
| Progress topic | Segment bytes.      | Defaults to 128 MiB. We recommend going no higher than 256 MiB to avoid
slow startups when creating new sinks, as they must process the entire progress topic on startup.
> **Warning:** Dropping a Kafka sink doesn't drop the corresponding topic. For more information, see the [Kafka documentation](https://kafka.apache.org/documentation/).


### Exactly-once processing

By default, Kafka sinks provide [exactly-once processing guarantees](https://kafka.apache.org/documentation/#semantics), which ensures that messages are not duplicated or dropped in failure scenarios.

To achieve this, Materialize stores some internal metadata in an additional
*progress topic*. This topic is shared among all sinks that use a particular
[Kafka connection](/sql/create-connection/#kafka). The name of the progress
topic can be specified when [creating a
connection](/sql/create-connection/#kafka); otherwise, a default name of
`_materialize-progress-{REGION ID}-{CONNECTION ID}` is used. In either case,
Materialize will attempt to create the topic if it does not exist. The contents
of this topic are not user-specified.

#### End-to-end exactly-once processing

Exactly-once semantics are an end-to-end property of a system, but Materialize
only controls the initial produce step. To ensure _end-to-end_ exactly-once
message delivery, you should ensure that:

- The broker is configured with replication factor greater than 3, with unclean
  leader election disabled (`unclean.leader.election.enable=false`).
- All downstream consumers are configured to only read committed data
  (`isolation.level=read_committed`).
- The consumers' processing is idempotent, and offsets are only committed when
  processing is complete.

For more details, see [the Kafka documentation](https://kafka.apache.org/documentation/).

### Partitioning

By default, Materialize assigns a partition to each message using the following
strategy:

  1. Encode the message's key in the specified format.
  2. If the format uses a Confluent Schema Registry, strip out the
     schema ID from the encoded bytes.
  3. Hash the remaining encoded bytes using [SeaHash].
  4. Divide the hash value by the topic's partition count and assign the
     remainder as the message's partition.

If a message has no key, all messages are sent to partition 0.

To configure a custom partitioning strategy, you can use the `PARTITION BY`
option. This option allows you to specify a SQL expression that computes a hash
for each message, which determines what partition to assign to the message:

```sql
-- General syntax.
CREATE SINK ... INTO KAFKA CONNECTION <name> (PARTITION BY = <expression>) ...;

-- Example.
CREATE SINK ... INTO KAFKA CONNECTION <name> (
    PARTITION BY = kafka_murmur2(name || address)
) ...;
```

The expression:
  * Must have a type that can be assignment cast to [`uint8`].
  * Can refer to any column in the sink's underlying relation when using the
    [upsert envelope](/sql/create-sink/kafka#upsert-envelope).
  * Can refer to any column in the sink's key when using the
    [Debezium envelope](/sql/create-sink/kafka#debezium-envelope).

Materialize uses the computed hash value to assign a partition to each message
as follows:

  1. If the hash is `NULL` or computing the hash produces an error, assign
     partition 0.
  2. Otherwise, divide the hash value by the topic's partition count and assign
     the remainder as the message's partition (i.e., `partition_id = hash %
     partition_count`).

Materialize provides several [hash functions](/sql/functions/#hash-functions)
which are commonly used in Kafka partition assignment:

  * `crc32`
  * `kafka_murmur2`
  * `seahash`

For a full example of using the `PARTITION BY` option, see [Custom
partioning](/sql/create-sink/kafka#custom-partitioning).

### Kafka transaction markers

Materialize uses <a href="https://www.confluent.io/blog/transactions-apache-kafka/" >Kafka
transactions</a>. When
Kafka transactions are used, special control messages known as <strong>transaction
markers</strong> are published to the topic. Transaction markers inform both the broker
and clients about the status of a transaction. When a topic is read using a
standard Kafka consumer, these markers are not exposed to the application, which
can give the impression that some offsets are being skipped.


---

## S3 Compatible Object Storage


This guide walks you through the steps required to export results from
Materialize to an S3 compatible object storage service, such as Google
Cloud Storage, or Cloudflare R2.

## Before you begin:
- Make sure that you have setup your bucket.
- Obtain the following for your bucket. Instructions to obtain these vary by provider.
  - The S3 compatible URI (`S3_BUCKET_URI`)
    - GCS compatible URIs (beginning with `gs://`) are also valid.
  - The S3 compatible access tokens (`ACCESS_KEY_ID` and `SECRET_ACCESS_KEY`)

## Step 1. Create a connection

1. In the [SQL Shell](/console/), or your preferred SQL
   client connected to Materialize, create an [AWS connection](/sql/create-connection/#aws),
   replacing `<ACCESS_KEY_ID>` and  `<SECRET_ACCESS_KEY>` with the credentials for your bucket. The AWS
   connection can be used to connect to any S3 compatible object storage service, by specifying the endpoint and the region.

   For example, to connect to Google Cloud Storage, you can run the following:

    ```mzsql
    CREATE SECRET secret_access_key AS '<SECRET_ACCESS_KEY>';
    CREATE CONNECTION bucket_connection TO AWS (
        ACCESS KEY ID = '<ACCESS_KEY_ID>',
        SECRET ACCESS KEY = SECRET secret_access_key,
        ENDPOINT = 'https://storage.googleapis.com',
        REGION = 'us'
    );
    ```

> **Warning:** `VALIDATE CONNECTION` only works for AWS S3 connections. Using `VALIDATE CONNECTION` to test a connection to S3 compatible object storage service will result in an error. However, you can still use the connection to copy data.


## Step 2. Run a bulk export

To export data to your target bucket, use the [`COPY TO`](/sql/copy-to/#copy-to-s3)
command and the AWS connection you created in the previous step. Replace the `<S3_BUCKET_URI>`
with the S3 compatible URI for your target bucket.


**Parquet:**

```mzsql
COPY some_object TO '<S3_BUCKET_URI>'
WITH (
    AWS CONNECTION = bucket_connection,
    FORMAT = 'parquet'
  );
```

For details on the Parquet writer settings Materialize uses, as well as data
type support and conversion, check the [reference documentation](/sql/copy-to/#copy-to-s3-parquet).



**CSV:**

```mzsql
COPY some_object TO '<S3_BUCKET_URI>'
WITH (
    AWS CONNECTION = bucket_connection,
    FORMAT = 'csv'
  );
```





## Step 3. (Optional) Add scheduling

Bulk exports to object storage using the `COPY TO` command are _one-shot_: every time
you want to export results, you must run the command. To automate running bulk
exports on a regular basis, you can set up scheduling, for example using a
simple `cron`-like service or an orchestration platform like Airflow or
Dagster.


---

## Snowflake


[//]: # "TODO(morsapaes) For Kafka users, it's possible to sink data to
Snowflake continuously using the Snowflake connector for Kafka. We should also
document that approach."

> **Public Preview:** This feature is in public preview.


This guide walks you through the steps required to bulk-export results from
Materialize to Snowflake using Amazon S3 as the intermediate object store.

## Before you begin

- Ensure you have access to an AWS account, and permissions to create and manage
  IAM policies and roles. If you're not an account administrator, you will need
  support from one!

- Ensure you have access to a Snowflake account, and are able to connect as a
  user with either the [`ACCOUNTADMIN` role](https://docs.snowflake.com/en/user-guide/security-access-control-considerations#using-the-accountadmin-role),
  or a role with the [global `CREATE INTEGRATION` privilege](https://docs.snowflake.com/en/user-guide/security-access-control-privileges#global-privileges-account-level-privileges).

## Step 1. Set up bulk exports to Amazon S3

Follow the [Amazon S3 integration guide](/serve-results/s3/) to set up an Amazon
S3 bucket that Materialize securely writes data into. This will be your
starting point for bulk-loading Materialize data into Snowflake.

## Step 2. Configure a Snowflake storage integration

### Create an IAM policy

To bulk-load data from an S3 bucket into Snowflake, you must create a new
[IAM policy](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies.html)
that specifies what actions can be performed on the bucket by the Snowflake
importer role. For Snowflake to be able to read data from the bucket, the IAM
policy must allow the following actions:

Action type  | Action name                                                                            | Action description
-------------|----------------------------------------------------------------------------------------|---------------
Write        | [`s3:GetBucketLocation`](https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketLocation.html) | Grants permission to return the region the bucket is hosted in.
Read         | [`s3:GetObject`](https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html) | Grants permission to retrieve objects from a bucket.
Read        | [`s3:GetObjectVersion`](https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html)| Grants permission to retrieve a specific version of an object from a bucket.
List        | [`s3:ListBucket`](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html)| Grants permission to list some or all of the objects in a bucket.
Write        | [`s3:DeleteObject`](https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html) | (Optional) Grants permission to remove an object from a bucket.
Write        | [`s3:DeleteObjectVersion`](https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html) | (Optional) Grants permission to remove a specific version of an object from a bucket.
Write        | [`s3:PutObject`](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html) | (Optional) Grants permission to add an object to a bucket.

To create a new IAM policy:

1. Navigate to **AWS Services**, then **AWS IAM**.

1. In the **IAM Dashboard**, click **Policies**, then **Create policy**.

1. For **Policy editor**, choose **JSON**.

1. Copy and paste the policy below into the editor, replacing `<bucket>` with
   the bucket name and `<prefix>` with the folder path prefix.

   ```json
   {
      "Version": "2012-10-17",
      "Statement": [
         {
            "Effect": "Allow",
            "Action": [
               "s3:PutObject",
               "s3:GetObject",
               "s3:GetObjectVersion",
               "s3:DeleteObject",
               "s3:DeleteObjectVersion"
            ],
            "Resource": "arn:aws:s3:::<bucket>/<prefix>/*"
         },
         {
            "Effect": "Allow",
            "Action": [
               "s3:ListBucket",
               "s3:GetBucketLocation"
            ],
            "Resource": "arn:aws:s3:::<bucket>",
            "Condition": {
               "StringLike": {
                  "s3:prefix": [
                     "<prefix>/*"
                  ]
               }
            }
         }
      ]
   }
   ```

1. Click **Next**.

1. Enter a name for the policy, and click **Create policy**.

### Create an IAM role

Next, you must attach the policy you just created to a Snowflake-specific
[IAM role](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles.html).

1. Navigate to **AWS Services**, then **AWS IAM**.

1. In the **IAM Dashboard**, click **Roles**, then **Create role**.

1. In **Trusted entity type**, select **Account ID**, then **This account**.
   Later, you'll update it with the unique identifier for your Snowflake account.

1. Check the **Require external ID** box. Enter a placeholder **External ID**
   (e.g. 0000). Later, you'll update it with the unique external ID for your
   Snowflake storage integration.

1. Click **Next**.

1. In **Add permissions**, select the IAM policy you created in [Create an IAM policy](#create-an-iam-policy),
   and click **Next**.

1. Enter a name for the role, and click **Create role**.

1. Click **View role** to see the role summary page, and note down the
   role **ARN**. You will need it in the next step to create a Snowflake storage
   integration.

### Create a Snowflake storage integration

> **Note:** Only users with either the [`ACCOUNTADMIN` role](https://docs.snowflake.com/en/user-guide/security-access-control-considerations#using-the-accountadmin-role),
> or a role with the [global `CREATE INTEGRATION` privilege](https://docs.snowflake.com/en/user-guide/security-access-control-privileges#global-privileges-account-level-privileges)
> can execute this step.


1. In [Snowsight](https://app.snowflake.com/), or your preferred SQL client
connected to Snowflake, create a [storage integration](https://docs.snowflake.com/en/sql-reference/sql/create-storage-integration),
replacing `<role>` with the name of the role you created in the previous step:

   ```sql
   CREATE STORAGE INTEGRATION S3_int
     TYPE = EXTERNAL_STAGE
     STORAGE_PROVIDER = 'S3'
     ENABLED = TRUE
     STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::001234567890:role/<role>'
     STORAGE_ALLOWED_LOCATIONS = ('*');
   ```

1. Retrieve the IAM principal for your Snowflake account using the
   [`DESC INTEGRATION`](https://docs.snowflake.com/en/sql-reference/sql/desc-integration)
   command:

   ```sql
   DESC INTEGRATION s3_int;
   ```

   Note down the values for the `STORAGE_AWS_IAM_USER_ARN` and
   `STORAGE_AWS_EXTERNAL_ID` properties. You will need them in the next step to
   update the Snowflake trust policy attached to your S3 bucket.

### Update the IAM policy

1. In your AWS account, find the IAM role you created in [Create an IAM role](#create-an-iam-role)
   and, under **Trust relationships**, click **Edit trust policy**. Use the values
   for the `STORAGE_AWS_IAM_USER_ARN` and `STORAGE_AWS_EXTERNAL_ID` properties
   from the previous step to update the trust policy's `Principal` and
   `ExternalId`, then click **Update policy**.

## Step 3. Create a Snowflake external stage

Back in Snowflake, create an [external stage](https://docs.snowflake.com/en/user-guide/data-load-S3-create-stage#external-stages) that uses the storage integration above and references your S3 bucket.

```sql
CREATE STAGE s3_stage
  STORAGE_INTEGRATION = s3_int
  URL = 's3://<bucket>/<prefix>/';
```

> **Note:** To create a stage that uses a storage integration, the active user must have a
> role with the [`CREATE STAGE` privilege](https://docs.snowflake.com/en/sql-reference/sql/create-stage)
> for the active schema, as well as the [`USAGE` privilege](https://docs.snowflake.com/en/sql-reference/sql/grant-privilege#syntax)
> on the relevant storage integration.


## Step 4. Import data into Snowflake

To import the data stored in S3 into Snowflake, you can then create a table and
use the [`COPY INTO`](https://docs.snowflake.com/en/sql-reference/sql/copy-into-table)
command to load it from the external stage.


**Parquet:**

Create a table with a single column of type [`VARIANT`](https://docs.snowflake.com/en/sql-reference/data-types-semistructured#variant):

```sql
CREATE TABLE s3_table_parquet (
    col VARIANT
);
```

Use `COPY INTO` to load the data into the table:

```sql
COPY INTO s3_table_parquet
  FROM @S3_stage
  FILE_FORMAT = (TYPE = 'PARQUET');
```

For more details on importing Parquet files staged in S3 into Snowflake, check the
[Snowflake documentation](https://docs.snowflake.com/en/sql-reference/sql/copy-into-table#type-parquet).



**CSV:**

Create a table with the same number of columns as the number of delimited
columns in the input data file:

```sql
CREATE TABLE s3_table_csv (
    col_1 INT,
    col_2 TEXT,
    col_3 TIMESTAMP
);
```

Use `COPY INTO` to load the data into the table:

```sql
COPY INTO s3_table_csv
  FROM @s3_stage
  FILE_FORMAT = (TYPE = 'CSV');
```

For more details on importing CSV files staged in S3 into Snowflake, check the
[Snowflake documentation](https://docs.snowflake.com/en/sql-reference/sql/copy-into-table#type-csv).






## Step 5. (Optional) Add scheduling

Bulk exports to Amazon S3 using the `COPY TO` command are _one-shot_: every time
you want to export results, you must run the command. To automate running bulk
exports from Materialize to Snowflake on a regular basis, you can set up
scheduling, for example using a simple `cron`-like service or an orchestration
platform like Airflow or Dagster.


---

## Troubleshooting sinks


<!-- Copied over from the old manage/troubleshooting guide -->
## Why isn't my sink exporting data?
First, look for errors in [`mz_sink_statuses`](/reference/system-catalog/mz_internal/#mz_sink_statuses):

```mzsql
SELECT * FROM mz_internal.mz_sink_statuses
WHERE name = <SINK_NAME>;
```

If your sink reports a status of `stalled` or `failed`, you likely have a
configuration issue. The returned `error` field will provide details.

If your sink reports a status of `starting` for more than a few minutes,
[contact support](/support).

## How do I monitor sink ingestion progress?

Repeatedly query the
[`mz_sink_statistics`](/reference/system-catalog/mz_internal/#mz_sink_statistics)
table and look for ingestion statistics that advance over time:

```mzsql
SELECT
    messages_staged,
    messages_committed,
    bytes_staged,
    bytes_committed
FROM mz_internal.mz_sink_statistics
WHERE id = <SINK ID>;
```

(You can also look at statistics for individual worker threads to evaluate
whether ingestion progress is skewed, but it's generally simplest to start
by looking at the aggregate statistics for the whole source.)

The `messages_staged` and `bytes_staged` statistics should roughly correspond
with what materialize has written (but not necessarily committed) to the
external service. For example, the `bytes_staged` and `messages_staged` fields
for a Kafka sink should roughly correspond with how many messages materialize
has written to the Kafka topic, and how big they are (including the key), but
the Kafka transaction for those messages might not have been committed yet.

`messages_committed` and `bytes_committed` correspond to the number of messages
committed to the external service. These numbers can be _smaller_ than the
`*_staged` statistics, because Materialize might fail to write transactions and
retry them.

If any of these statistics are not making progress, your sink might be stalled
or need to be scaled up.

If the `*_staged` statistics are making progress, but the `*_committed` ones
are not, there may be a configuration issues with the external service that is
preventing Materialize from committing transactions. Check the `reason`
column in `mz_sink_statuses`, which can provide more information.

