Stream: a partitioned, append-only log of messages
Message: base64-encoded record that is published to the stream
Offset: the location of a message within a partition.
Producer: an entity that writes/publishes messages to a stream.
Consumer: an entity that reads messages from one or more streams.
Partition: a section of a stream. It allows you to distribute a stream by splitting messages across multiple nodes. Each partition can be placed on a separate machine to allow for multiple consumers to read from a stream in parallel.
Cursor: a pointer to a location (offset or time) in a stream.
Consume at a specified offset.
Consume after the given offset.
Consume from the oldest available message.
Consume after given time. Timestamp of the message ≥ supplied time.
Consume messages published after the cursor creation.
# Source: mushop/charts/events/templates/events-deployment.yaml
env:
# API credentials
- name: TENANCY
valueFrom:
secretKeyRef:
name: oci-credentials
key: tenancy
- name: REGION
valueFrom:
secretKeyRef:
name: oci-credentials
key: region
optional: true
- name: USER_ID
valueFrom:
secretKeyRef:
name: oci-credentials
key: user
- name: PRIVATE_KEY
valueFrom:
secretKeyRef:
name: oci-credentials
key: privatekey
- name: FINGERPRINT
valueFrom:
secretKeyRef:
name: oci-credentials
key: fingerprint
- name: PASSPHRASE
valueFrom:
secretKeyRef:
name: oci-credentials
key: passphrase
optional: true
# Stream connection
- name: STREAM_ID
valueFrom:
secretKeyRef:
name: oss-connection
key: streamId
- name: MESSAGES_ENDPOINT
valueFrom:
secretKeyRef:
name: oss-connection
key: messageEndpoint
optional: true
final String tenantId = env.getProperty("OCI_TENANT_ID");
final String userId = env.getProperty("OCI_USER_ID");
final String fingerprint = env.getProperty("OCI_FINGERPRINT");
final String privateKey = env.getProperty("OCI_API_KEY");
final String passPhrase = env.getProperty("OCI_PASS_PHRASE");
final String region = env.getProperty("OCI_REGION");
final String compartmentId = env.getProperty("OCI_COMPARTMENT_ID");
final String streamId = env.getProperty("STREAM_ID");
AuthenticationDetailsProvider provider =
SimpleAuthenticationDetailsProvider.builder()
.tenantId(tenantId)
.userId(userId)
.fingerprint(fingerprint)
.privateKeySupplier(new StringPrivateKeySupplier(privateKey))
.region(Region.fromRegionId(region))
.build();
final StreamAdminClient adminClient = new StreamAdminClient(provider);
Stream stream = getStream(adminClient, compartmentId, streamName, partitions);
streamClient = new StreamClient(provider);
streamClient.setEndpoint(stream.getMessagesEndpoint());
PutMessagesDetails messagesDetails = PutMessagesDetails.builder()
.messages(messages)
.build();
PutMessagesRequest putRequest = PutMessagesRequest.builder()
.streamId(streamsConfig.getStreamId())
.putMessagesDetails(messagesDetails)
.build();
PutMessagesResponse putResponse = streamsConfig.getStreamClient()
.putMessages(putRequest);
CreateCursorDetails cursorDetails =
CreateCursorDetails.builder()
.partition(partition)
.type(Type.Latest)
//.type(Type.TrimHorizon)
//.type(Type.AtOffset)
.build();
CreateCursorRequest createCursorRequest =
CreateCursorRequest.builder()
.streamId(streamId)
.createCursorDetails(cursorDetails)
.build();
CreateCursorResponse cursorResponse =
streamClient.createCursor(createCursorRequest);
return cursorResponse.getCursor().getValue();
for (;;) {
GetMessagesRequest getRequest =
GetMessagesRequest.builder()
.streamId(streamId)
.cursor(cursor)
.limit(10)
.build();
GetMessagesResponse getResponse = streamClient.getMessages(getRequest);
//Extract Json message from getResponse
//Print message
}
mushop-stream
, leaving other values as their defaultsOCID
Name
oci-credentials
with API
credentials to connect services from within the cluster:
kubectl create secret generic oci-credentials \
--from-literal=tenancy='<TENANCY_OCID>' \
--from-literal=user='<USER_OCID>' \
--from-literal=region='<USER_OCI_REGION>' \
--from-literal=fingerprint='<PUBLIC_API_KEY_FINGERPRINT>' \
--from-literal=passphrase='<PRIVATE_API_KEY_PASSPHRASE>' \
--from-file=privatekey='<PATH_OF_PRIVATE_API_KEY>'
oss-connection
secret:
kubectl create secret generic oss-connection \
--from-literal=streamId='<STREAM_OCID>' \
--from-literal=messageEndpoint='<MESSAGE_ENDPOINT_URL>'
mv ~/Downloads/myvalues.yaml ./myvalues.yaml
cp mushop/values-dev.yaml ./myvalues.yaml
myvalues.yaml
:
# Global service configurations
global:
ociAuthSecret: oci-credentials # OCI authentication credentials secret name
ossStreamSecret: oss-connection # OCI Streaming secret name
#...
helm del mushop
Install from the deploy/complete/helm-cart
directory:
helm install mushop mushop/ \
-f myvalues.yaml
kubectl get pod --watch
events
pod name:
kubectl get po
>
Terminal 1: Tail events
logs:
kubectl logs -f --tail 10 mushop-events-gx8wj
(name will vary)
>
Terminal 2: Expose the events
service:
kubectl port-forward svc/mushop-events 8000:80
>
Terminal 3: Send a test message to the tracking
microservice:
curl -X POST \
http://localhost:8000/events \
-H 'Content-Type: application/json' \
-d '{
"source": "test",
"track": "abc1234",
"events": [{
"type": "any",
"detail": "hello"
}]
}'
👀 Watch the terminals and observe messages being produced by
events
You should also look at the OCI console. Refresh the table and watch your message being published to the stream.
Click here to access the OCI console.