How to Replicate Couchbase data to BigQuery using Mage.ai?

Haithem Souala
Woop Technology
Published in
5 min readMar 31, 2023

--

In today’s data-driven world, businesses are seeking efficient ways to manage and streamline their data processes. Recently, at WOOP, we leveraged Mage.ai, a powerful data pipeline tool, to synchronize data between Couchbase and BigQuery. At WOOP, we’re using the Couchbase ecosystem as a database and for synchronization between the Mapotempo live and Mapotempo Web, a route optimization and route Planning solutions of our last-mile delivery management system. This has allowed us to maintain real-time data synchronization between our mobile and web application, providing a seamless user experience for our customers.

The problem:

The process of replicating data from Couchbase, a NoSQL database, to BigQuery presented a few challenges. Firstly, there was no BigQuery connector provided by Couchbase or Airbyte, which is the ELT tool used to synchronize data from various sources such as PostgreSQL and MongoDB. Moreover, technical constraints on the Couchbase Cluster prevented us from creating an index to query the Couchbase documents and transfer the data through the built-in connector offered by Mage.ai (a big thanks to the Mage.ai team for developing a couchbase connector in a couple days).

Given the challenges we faced, we needed to devise a new solution that would be efficient and reliable while not requiring significant implementation time — a win-win approach.

Solution N°1:

Using a built-in connector provided by Mage.ai or any 3rd party. This is a straightforward process that involves setting up a connection between the two systems and configuring the data synchronization settings. This approach is ideal for those who are able to create the necessary index.

To achieve this, you must have Mage.ai installed. Refer to the documentation for instructions on deploying it. If you experience any difficulties, don’t hesitate to contact the team via Slack; they are extremely helpful.

Mage.ai
  1. Create a new Data Integration Pipeline
  2. Select Couchbase as Source
  3. In the configuration section, provide the required information
  4. Configure the Transformer and Exporter stages, and you are ready to deploy the pipeline.

Pay attention to the connection_string, If you’re connecting to Capella, enable TLS by using couchbases:// (note the final ‘s’) instead of couchbase://. For more information about TLS, see Secure Connections.

Solution N°2:

In case you’re not able to use a connector, you still can use the power of Mage.ai and GCP services to replicate the data from Couchbase.

After some research, we found out that there is an index sg_allDocs_x1 created by Sync Gateway and could be used to query the documents metadata, like document ID, Sequence, and Flags.

To use this approach, you will need to create a script that reads the metadata from Couchbase and fetch the corresponding documents, then publish each document as a message, using the incredible integration between GCP Pub-sub and BigQuery.

Set up your destination

Before we can start building our pipeline, we need to configure Pub/Sub and BigQuery. First, create a BigQuery dataset and table to receive the messages. Note that the table must contain a field called “data”. Once you have set up the table, it’s time to create a Pub/Sub topic. After creating the topic, you need to set up a subscription that will push messages to BigQuery.

When setting up a new subscription to a topic, you select the Write to BigQuery option, as shown here:

Once your subscription is set up, you’re ready to publish a message to your Pub/Sub topic.

Mage.ai, Data plumbing without hassle

To build a custom pipeline that fetches data from Couchbase and streams it to Pub/Sub, we will use Mage.ai. If you’re not already familiar with the tool, check out the documentation, it contains all the necessary information to set it up.

So, let’s start by creating a generic python template data loader block, that will contain all the logic, note that you can split the code into many blocks if you would like to transform the data or for a better readability and maintainability.

The initial step is to establish a connection to the Couchbase cluster:

# Authenticate to Google Cloud Pub/Sub using a service account JSON key
credentials = service_account.Credentials.from_service_account_file('sa.json')
publisher = pubsub_v1.PublisherClient(credentials=credentials)

# Connect to the Couchbase cluster
cluster = Cluster(kwargs['COUCHBASE_URI'], ClusterOptions(
PasswordAuthenticator(kwargs['COUCHBASE_USERNAME'], get_secret_value('COUCHBASE_PASSWORD'))
))

# Wait until the cluster is ready for use.
logging.info("Waiting CB cluster to be ready…")
cluster.wait_until_ready(timedelta(seconds=2))
logging.info("CB cluster is ready…")

# Open the bucket
bucket = cluster.bucket(kwargs['COUCHBASE_BUCKET'])
collection = bucket.default_collection()

Upon successfully connecting to the Couchbase cluster, we need to obtain the sequence number of the last batched document if it exists. If not, this is considered the first synchronization:

# Read the last sequence number from the file, or set it to 0 if the file doesn't exist
seq_file_path = kwargs['LAST_SEQUENCE_PATH_PROD']
if os.path.exists(seq_file_path):
with open(seq_file_path, 'r') as f:
last_seq = int(f.read())
else:
last_seq = 0

Sequence attribute is useful to implement an incremental sync.

Next, we will query the database to obtain the metadata, specifically, the sequence number and document ID, which will be used to retrieve the document data.

# Define the N1QL query
raw_query = f'''
SELECT META(db).id AS id,
META(db).xattrs._sync.sequence AS seq,
META(db).xattrs._sync.flags AS flags
FROM `bucket-name` AS db USE INDEX (sg_allDocs_x1)
WHERE META(db).xattrs._sync.sequence > {last_seq}
AND META(db).xattrs._sync.sequence < 4073709551615
AND META(db).id NOT LIKE "\\\_sync:%"
ORDER BY META(db).xattrs._sync.sequence
LIMIT 10000
'''

# Query the couchbase database to retrieve the document metadata
logging.info('Querying metadata…')
result = cluster.query(raw_query)

logging.info('Fetching result…')
for row in result.rows():
doc_id = row['id']

# Retrieve the document by id
doc = collection.get(doc_id)

# Convert the document to a string and publish it to the Pub/Sub topic
message = json.dumps(doc).encode('utf-8')
future = publisher.publish(kwargs['TOPIC_NAME'], data=message)

# Logging the document ID and message ID
logging.info(f'Document ID: {doc_id}, Message ID: {future.result()}')

Wrapping up

In conclusion, managing and synchronizing data from different sources can be a challenging task, especially when dealing with technical constraints and lack of connectors. However, with the help of powerful data pipeline tools like Mage.ai, businesses can efficiently manage their data processes and synchronize data between different databases, such as Couchbase and BigQuery. The two solutions presented in this article, using a built-in connector or leveraging GCP services, demonstrate the flexibility and reliability of Mage.ai in addressing different data synchronization scenarios.

--

--