Skip to content

Commit

Permalink
Merge branch 'main' into crystal/refactor-event-processing
Browse files Browse the repository at this point in the history
  • Loading branch information
clemire authored Feb 7, 2025
2 parents 47d41a6 + 90d4cf3 commit d4550ef
Show file tree
Hide file tree
Showing 8 changed files with 225 additions and 22 deletions.
179 changes: 179 additions & 0 deletions core/scripts/get_licenses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
#! /usr/bin/env python3

import os
import re
import json
import subprocess
import csv
import sys
import argparse


def parse_args():
parser = argparse.ArgumentParser(description='List licenses of Go module dependencies')
parser.add_argument('--show-path', action='store_true',
help='Show path to license files in output')
return parser.parse_args()


def parse_go_mod(go_mod_path):
"""
Parse the go.mod file to extract direct dependencies (not marked as indirect).
Returns a list of module names.
"""
direct_deps = []
in_require_block = False
try:
with open(go_mod_path, 'r') as f:
for line in f:
stripped = line.strip()
# Check for the start of a require block
if stripped.startswith('require ('):
in_require_block = True
continue
if in_require_block:
if stripped == ')':
in_require_block = False
continue
# A typical line: github.com/stretchr/testify v1.7.0 // indirect
m = re.match(r'(\S+)\s+(\S+)(?:\s+//\s*(.*))?', stripped)
if m:
mod, ver, comment = m.groups()
if comment and 'indirect' in comment:
continue
direct_deps.append(mod)
else:
# Handle single-line require directives
if stripped.startswith('require '):
# e.g. require github.com/something v1.0.0
parts = stripped.split()
if len(parts) >= 3:
mod = parts[1]
comment = ' '.join(parts[3:]) if len(parts) > 3 else ''
if 'indirect' in comment:
continue
direct_deps.append(mod)
except Exception as e:
sys.stderr.write(f"Error reading {go_mod_path}: {e}\n")
sys.exit(1)
# Remove duplicates
return list(set(direct_deps))


def get_module_dir(module):
"""
Use 'go mod download -json <module>' to get the directory where the module is downloaded.
Returns the directory path, or None on error.
"""
try:
result = subprocess.run(['go', 'mod', 'download', '-json', module], capture_output=True, text=True)
if result.returncode != 0:
sys.stderr.write(f"Error downloading module {module}: {result.stderr}\n")
return None
info = json.loads(result.stdout)
return info.get('Dir')
except Exception as e:
sys.stderr.write(f"Exception for module {module}: {e}\n")
return None


def find_license_file(module_dir):
"""
Search for a file whose name starts with 'LICENSE' or 'COPYING' (case-insensitive) in the given directory.
Returns the full path of the license file if found, else None.
"""
try:
for entry in os.listdir(module_dir):
entry_lower = entry.lower()
if entry_lower.startswith('license') or entry_lower == 'copying':
return os.path.join(module_dir, entry)
except Exception as e:
sys.stderr.write(f"Error reading directory {module_dir}: {e}\n")
return None


def guess_license(license_text):
"""
Attempt to guess the license type from the text of the license file.
Returns a string representing the license, or 'UNKNOWN' if not determined.
"""
text = license_text.strip()
if 'The Go Authors. All rights reserved.' in text or re.search(r'Copyright\s+\d+\s+The Go Authors', text):
return 'BSD-3-Clause'
if 'MIT License' in text or 'Permission is hereby granted, free of charge' in text:
return 'MIT'
if 'Apache License' in text:
if 'Version 2.0' in text:
return 'Apache-2.0'
else:
return 'Apache'
if 'Mozilla Public License' in text:
m = re.search(r'Mozilla Public License.*?Version\s+([\d\.]+)', text, re.IGNORECASE | re.DOTALL)
if m:
return 'MPL ' + m.group(1)
else:
return 'MPL'
if re.search(r'GNU GENERAL PUBLIC LICENSE', text, re.IGNORECASE):
# Look for first instance of Version after GPL, even if on next line
m = re.search(r'Version\s+([\d\.]+)', text, re.IGNORECASE)
if m:
return 'GPL ' + m.group(1)
else:
return 'GPL'
if 'BSD' in text:
return 'BSD'
return 'UNKNOWN'


def process_dependencies(go_mod_path):
deps = parse_go_mod(go_mod_path)
results = []

for mod in deps:
license_type = 'UNKNOWN'
license_path = ''
module_dir = get_module_dir(mod)
if module_dir:
license_file = find_license_file(module_dir)
if license_file and os.path.isfile(license_file):
try:
with open(license_file, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
license_type = guess_license(content)
license_path = license_file
except Exception as e:
sys.stderr.write(f"Error reading license file for module {mod}: {e}\n")
results.append((mod, license_type, license_path))
return results


def output_csv(results, show_path=False):
writer = csv.writer(sys.stdout)
# Write header
headers = ['Library', 'License']
if show_path:
headers.append('License File')
writer.writerow(headers)

for row in results:
if show_path:
writer.writerow(row)
else:
writer.writerow(row[:2]) # Only output library and license


def main():
args = parse_args()

# Assume go.mod is in the current directory
go_mod_path = 'go.mod'
if not os.path.isfile(go_mod_path):
sys.stderr.write('go.mod not found in the current directory.\n')
sys.exit(1)

results = process_dependencies(go_mod_path)
output_csv(results, show_path=args.show_path)


if __name__ == '__main__':
main()
13 changes: 10 additions & 3 deletions packages/encryption/src/decryptionExtensions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,11 @@ export abstract class BaseDecryptionExtensions {
* upload device keys to the server
*/
public abstract uploadDeviceKeys(): Promise<void>
public abstract getPriorityForStream(streamId: string, highPriorityIds: Set<string>): number
public abstract getPriorityForStream(
streamId: string,
highPriorityIds: Set<string>,
recentStreamIds: Set<string>,
): number

public enqueueNewGroupSessions(
sessions: UserInboxPayload_GroupEncryptionSessions,
Expand Down Expand Up @@ -446,8 +450,11 @@ export abstract class BaseDecryptionExtensions {
}

private compareStreamIds(a: string, b: string): number {
const priorityIds = new Set([...this.highPriorityIds, ...this.recentStreamIds])
return this.getPriorityForStream(a, priorityIds) - this.getPriorityForStream(b, priorityIds)
const recentStreamIds = new Set(this.recentStreamIds)
return (
this.getPriorityForStream(a, this.highPriorityIds, recentStreamIds) -
this.getPriorityForStream(b, this.highPriorityIds, recentStreamIds)
)
}

private lastPrintedAt = 0
Expand Down
6 changes: 5 additions & 1 deletion packages/encryption/src/tests/decryptionExtensions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,11 @@ class MockDecryptionExtensions extends BaseDecryptionExtensions {
return true
}

public getPriorityForStream(_streamId: string, _highPriorityIds: Set<string>): number {
public getPriorityForStream(
_streamId: string,
_highPriorityIds: Set<string>,
_recentStreamIds: Set<string>,
): number {
return 0
}

Expand Down
13 changes: 7 additions & 6 deletions packages/sdk/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ export class Client
const unpacked = await unpackStream(response.stream, this.unpackEnvelopeOpts)
await stream.initializeFromResponse(unpacked)
if (stream.view.syncCookie) {
await this.streams.addStreamToSync(stream.view.syncCookie)
this.streams.addStreamToSync(streamId, stream.view.syncCookie)
}
} catch (err) {
this.logError('Failed to create stream', streamId)
Expand Down Expand Up @@ -1415,7 +1415,7 @@ export class Client
this.logCall('initStream: had existing request for', streamIdStr, 'returning promise')
return existingRequest
}
const request = this._initStream(streamId, allowGetStream, persistedData)
const request = this._initStream(streamIdStr, allowGetStream, persistedData)
this.initStreamRequests.set(streamIdStr, request)
let stream: Stream
try {
Expand All @@ -1427,7 +1427,7 @@ export class Client
}

private async _initStream(
streamId: string | Uint8Array,
streamId: string,
allowGetStream: boolean = true,
persistedData?: LoadedStream,
): Promise<Stream> {
Expand All @@ -1446,9 +1446,10 @@ export class Client
const stream = this.createSyncedStream(streamId)

// Try initializing from persistence
if (await stream.initializeFromPersistence(persistedData)) {
const success = await stream.initializeFromPersistence(persistedData)
if (success) {
if (stream.view.syncCookie) {
await this.streams.addStreamToSync(stream.view.syncCookie)
this.streams.addStreamToSync(streamId, stream.view.syncCookie)
}
return stream
}
Expand All @@ -1473,7 +1474,7 @@ export class Client
this.logCall('initStream calling initializingFromResponse', streamId)
await stream.initializeFromResponse(unpacked)
if (stream.view.syncCookie) {
await this.streams.addStreamToSync(stream.view.syncCookie)
this.streams.addStreamToSync(streamId, stream.view.syncCookie)
}
} catch (err) {
this.logError('Failed to initialize stream', streamId, err)
Expand Down
20 changes: 14 additions & 6 deletions packages/sdk/src/clientDecryptionExtensions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,11 @@ export class ClientDecryptionExtensions extends BaseDecryptionExtensions {
}
}

public getPriorityForStream(streamId: string, highPriorityIds: Set<string>): number {
public getPriorityForStream(
streamId: string,
highPriorityIds: Set<string>,
recentStreamIds: Set<string>,
): number {
if (
isUserDeviceStreamId(streamId) ||
isUserInboxStreamId(streamId) ||
Expand All @@ -324,26 +328,30 @@ export class ClientDecryptionExtensions extends BaseDecryptionExtensions {
if ((isDmOrGdm || isChannel) && highPriorityIds.has(streamId)) {
return 1
}
// if you're getting updates for this stream, decrypt them so that you see unread messages
if (recentStreamIds.has(streamId)) {
return 2
}
// channels in the space we're currently viewing
if (isChannel) {
const spaceId = spaceIdFromChannelId(streamId)
if (highPriorityIds.has(spaceId)) {
return 2
return 3
}
}
// dms
if (isDmOrGdm) {
return 3
return 4
}
// space that we're currently viewing
if (highPriorityIds.has(streamId)) {
return 4
return 5
}
// then other channels,
if (isChannel) {
return 5
return 6
}
// then other spaces
return 6
return 7
}
}
8 changes: 5 additions & 3 deletions packages/sdk/src/syncedStreams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,16 +126,18 @@ export class SyncedStreams {
}

// adds stream to the sync subscription
public async addStreamToSync(syncCookie: SyncCookie): Promise<void> {
const streamId = streamIdAsString(syncCookie.streamId)
public addStreamToSync(streamId: string, syncCookie: SyncCookie): void {
if (!this.syncedStreamsLoop) {
return
}
this.log('addStreamToSync', streamId)
const stream = this.streams.get(streamId)
if (!stream) {
// perhaps we called stopSync while loading a stream from persistence
this.logError('streamId not in this.streams, not adding to sync', streamId)
return
}
await this.syncedStreamsLoop?.addStreamToSync(syncCookie, stream)
this.syncedStreamsLoop.addStreamToSync(streamId, syncCookie, stream)
}

// remove stream from the sync subsbscription
Expand Down
3 changes: 1 addition & 2 deletions packages/sdk/src/syncedStreamsLoop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,7 @@ export class SyncedStreamsLoop {
}

// adds stream to the sync subscription
public async addStreamToSync(syncCookie: SyncCookie, stream: ISyncedStream): Promise<void> {
const streamId = streamIdAsString(syncCookie.streamId)
public addStreamToSync(streamId: string, syncCookie: SyncCookie, stream: ISyncedStream) {
this.logDebug('addStreamToSync', streamId)
if (this.streams.has(streamId)) {
this.log('stream already in sync', streamId)
Expand Down
5 changes: 4 additions & 1 deletion packages/sdk/src/tests/multi_ne/syncedStreams.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ describe('syncStreams', () => {
await done1.promise

alicesSyncedStreams.set(alicesUserInboxStreamIdStr, userInboxStream)
await alicesSyncedStreams.addStreamToSync(userInboxStream.view.syncCookie!)
alicesSyncedStreams.addStreamToSync(
alicesUserInboxStreamIdStr,
userInboxStream.view.syncCookie!,
)

// some helper functions
const addEvent = async (payload: PlainMessage<StreamEvent>['payload']) => {
Expand Down

0 comments on commit d4550ef

Please sign in to comment.