[mastodon-client] Implement streaming API

This commit is contained in:
Laura Hausmann 2023-10-08 16:36:20 +02:00
parent 79a4259305
commit 878970d318
Signed by: zotan
GPG key ID: D044E84C5BE01605
24 changed files with 865 additions and 29 deletions

View file

@ -1,7 +1,8 @@
import type { Packed } from "./schema.js";
import { Note } from "@/models/entities/note.js";
export function isInstanceMuted(
note: Packed<"Note">,
note: Packed<"Note"> | Note,
mutedInstances: Set<string>,
): boolean {
if (mutedInstances.has(note?.user?.host ?? "")) return true;

View file

@ -45,7 +45,7 @@ import { extractApMentions } from "./mention.js";
import DbResolver from "../db-resolver.js";
import { StatusError } from "@/misc/fetch.js";
import { shouldBlockInstance } from "@/misc/should-block-instance.js";
import { publishNoteStream } from "@/services/stream.js";
import { publishNoteStream, publishNoteUpdatesStream } from "@/services/stream.js";
import { extractHashtags } from "@/misc/extract-hashtags.js";
import { UserProfiles } from "@/models/index.js";
import { In } from "typeorm";
@ -760,5 +760,12 @@ export async function updateNote(value: string | IObject, resolver?: Resolver) {
publishNoteStream(note.id, "updated", {
updatedAt: update.updatedAt,
});
const updatedNote = {
...note,
...update
};
publishNoteUpdatesStream("updated", updatedNote);
}
}

View file

@ -1,6 +1,7 @@
import define from "../../../define.js";
import { Announcements } from "@/models/index.js";
import { genId } from "@/misc/gen-id.js";
import { publishBroadcastStream } from "@/services/stream.js";
export const meta = {
tags: ["admin"],
@ -85,6 +86,8 @@ export default define(meta, paramDef, async (ps) => {
isGoodNews: ps.isGoodNews ?? false,
}).then((x) => Announcements.findOneByOrFail(x.identifiers[0]));
publishBroadcastStream("announcementAdded", announcement);
return Object.assign({}, announcement, {
createdAt: announcement.createdAt.toISOString(),
updatedAt: null,

View file

@ -1,6 +1,7 @@
import define from "../../../define.js";
import { Announcements } from "@/models/index.js";
import { ApiError } from "../../../error.js";
import { publishBroadcastStream } from "@/services/stream.js";
export const meta = {
tags: ["admin"],
@ -30,5 +31,6 @@ export default define(meta, paramDef, async (ps, me) => {
if (announcement == null) throw new ApiError(meta.errors.noSuchAnnouncement);
publishBroadcastStream("announcementDeleted", announcement.id);
await Announcements.delete(announcement.id);
});

View file

@ -18,7 +18,8 @@ import { awaitAll } from "@/prelude/await-all.js";
import { UserHelpers } from "@/server/api/mastodon/helpers/user.js";
import { IsNull } from "typeorm";
import { MfmHelpers } from "@/server/api/mastodon/helpers/mfm.js";
import { MastoContext } from "@/server/api/mastodon/index.js";
import { getStubMastoContext, MastoContext } from "@/server/api/mastodon/index.js";
import { NoteHelpers } from "@/server/api/mastodon/helpers/note.js";
export class NoteConverter {
public static async encode(note: Note, ctx: MastoContext, recurse: boolean = true): Promise<MastodonEntity.Status> {
@ -164,4 +165,10 @@ export class NoteConverter {
};
});
}
public static async encodeEvent(note: Note, user: ILocalUser | undefined): Promise<MastodonEntity.Status> {
const ctx = getStubMastoContext(user);
NoteHelpers.fixupEventNote(note);
return NoteConverter.encode(note, ctx);
}
}

View file

@ -6,7 +6,8 @@ import { UserHelpers } from "@/server/api/mastodon/helpers/user.js";
import { awaitAll } from "@/prelude/await-all.js";
import { NoteConverter } from "@/server/api/mastodon/converters/note.js";
import { getNote } from "@/server/api/common/getters.js";
import { MastoContext } from "@/server/api/mastodon/index.js";
import { getStubMastoContext, MastoContext } from "@/server/api/mastodon/index.js";
import { Notifications } from "@/models/index.js";
type NotificationType = typeof notificationTypes[number];
@ -26,11 +27,13 @@ export class NotificationConverter {
type: this.encodeNotificationType(notification.type),
};
if (notification.note) {
const isPureRenote = notification.note.renoteId !== null && notification.note.text === null;
const note = notification.note ?? (notification.noteId ? await getNote(notification.noteId, localUser) : null);
if (note) {
const isPureRenote = note.renoteId !== null && note.text === null;
const encodedNote = isPureRenote
? getNote(notification.note.renoteId!, localUser).then(note => NoteConverter.encode(note, ctx))
: NoteConverter.encode(notification.note, ctx);
? getNote(note.renoteId!, localUser).then(note => NoteConverter.encode(note, ctx))
: NoteConverter.encode(note, ctx);
result = Object.assign(result, {
status: encodedNote,
});
@ -78,4 +81,10 @@ export class NotificationConverter {
throw new Error(`Notification type ${t} not supported`);
}
}
public static async encodeEvent(target: Notification["id"], user: ILocalUser): Promise<MastodonEntity.Notification | null> {
const ctx = getStubMastoContext(user);
const notification = await Notifications.findOneByOrFail({ id: target });
return this.encode(notification, ctx).catch(_ => null);
}
}

View file

@ -0,0 +1,7 @@
import Router from "@koa/router";
export function setupEndpointsStreaming(router: Router): void {
router.get("/v1/streaming/health", async (ctx) => {
ctx.body = "OK";
});
}

View file

@ -48,7 +48,7 @@ export class MiscHelpers {
email: meta.maintainerEmail || "",
version: `4.1.0 (compatible; Iceshrimp ${config.version})`,
urls: {
streaming_api: `${config.url.replace(/^http(?=s?:\/\/)/, "ws")}/streaming`,
streaming_api: `${config.url.replace(/^http(?=s?:\/\/)/, "ws")}/mastodon`,
},
stats: awaitAll({
user_count: userCount,

View file

@ -23,13 +23,13 @@ import { VisibilityConverter } from "@/server/api/mastodon/converters/visibility
import mfm from "mfm-js";
import { FileConverter } from "@/server/api/mastodon/converters/file.js";
import { MfmHelpers } from "@/server/api/mastodon/helpers/mfm.js";
import { toArray } from "@/prelude/array.js";
import { toArray, unique } from "@/prelude/array.js";
import { MastoApiError } from "@/server/api/mastodon/middleware/catch-errors.js";
import { Cache } from "@/misc/cache.js";
import AsyncLock from "async-lock";
import { IdentifiableError } from "@/misc/identifiable-error.js";
import { IsNull } from "typeorm";
import { MastoContext } from "@/server/api/mastodon/index.js";
import { getStubMastoContext, MastoContext } from "@/server/api/mastodon/index.js";
export class NoteHelpers {
public static postIdempotencyCache = new Cache<{ status?: MastodonEntity.Status }>('postIdempotencyCache', 60 * 60);
@ -412,6 +412,33 @@ export class NoteHelpers {
});
}
public static async getConversationFromEvent(noteId: string, user: ILocalUser): Promise<MastodonEntity.Conversation> {
const ctx = getStubMastoContext(user);
const note = await getNote(noteId, ctx.user);
const conversationId = note.threadId ?? note.id;
const userIds = unique([note.userId].concat(note.visibleUserIds).filter(p => p != ctx.user.id));
const users = userIds.map(id => UserHelpers.getUserCached(id, ctx).catch(_ => null));
const accounts = Promise.all(users).then(u => UserConverter.encodeMany(u.filter(u => u) as User[], ctx));
const res = {
id: conversationId,
accounts: accounts.then(u => u.length > 0 ? u : UserConverter.encodeMany([ctx.user], ctx)), // failsafe to prevent apps from crashing case when all participant users have been deleted
last_status: NoteConverter.encode(note, ctx),
unread: true
};
return awaitAll(res);
}
public static fixupEventNote(note: Note): Note {
note.createdAt = note.createdAt ? new Date(note.createdAt) : note.createdAt;
note.updatedAt = note.updatedAt ? new Date(note.updatedAt) : note.updatedAt;
note.reply = null;
note.renote = null;
note.user = null;
return note;
}
public static getIdempotencyKey(ctx: MastoContext): string | null {
const headers = ctx.headers;
const user = ctx.user as ILocalUser;

View file

@ -52,7 +52,6 @@ export class TimelineHelpers {
generateMutedUserRenotesQueryForNotes(query, user);
query.andWhere("note.visibility != 'hidden'");
query.andWhere("note.visibility != 'specified'");
return PaginationHelpers.execQueryLinkPagination(query, limit, minId !== undefined, ctx);
}

View file

@ -18,6 +18,9 @@ import { KoaBodyMiddleware } from "@/server/api/mastodon/middleware/koa-body.js"
import { NormalizeQueryMiddleware } from "@/server/api/mastodon/middleware/normalize-query.js";
import { PaginationMiddleware } from "@/server/api/mastodon/middleware/pagination.js";
import { SetHeadersMiddleware } from "@/server/api/mastodon/middleware/set-headers.js";
import { UserHelpers } from "@/server/api/mastodon/helpers/user.js";
import { ILocalUser } from "@/models/entities/user.js";
import { setupEndpointsStreaming } from "@/server/api/mastodon/endpoints/streaming.js";
export const logger = apiLogger.createSubLogger("mastodon");
export type MastoContext = RouterContext & DefaultContext;
@ -30,6 +33,7 @@ export function setupMastodonApi(router: Router): void {
setupEndpointsFilter(router);
setupEndpointsTimeline(router);
setupEndpointsNotifications(router);
setupEndpointsStreaming(router);
setupEndpointsSearch(router);
setupEndpointsMedia(router);
setupEndpointsList(router);
@ -45,3 +49,10 @@ function setupMiddleware(router: Router): void {
router.use(AuthMiddleware);
router.use(CacheMiddleware);
}
export function getStubMastoContext(user: ILocalUser | null | undefined): any {
return {
user: user ?? null,
cache: UserHelpers.getFreshAccountCache()
};
}

View file

@ -0,0 +1,48 @@
import { MastodonStreamingConnection } from ".";
export abstract class MastodonStream {
protected connection: MastodonStreamingConnection;
public readonly chName: string;
public static readonly shouldShare: boolean;
public static readonly requireCredential: boolean;
public static readonly requiredScopes: string[] = [];
protected get user() {
return this.connection.user;
}
protected get userProfile() {
return this.connection.userProfile;
}
protected get following() {
return this.connection.following;
}
protected get muting() {
return this.connection.muting;
}
protected get renoteMuting() {
return this.connection.renoteMuting;
}
protected get blocking() {
return this.connection.blocking;
}
protected get subscriber() {
return this.connection.subscriber;
}
protected constructor(connection: MastodonStreamingConnection, name: string) {
this.chName = name;
this.connection = connection;
}
public abstract init(params: any): void;
public dispose?(): void;
public onMessage?(type: string, body: any): void;
}

View file

@ -0,0 +1,72 @@
import { MastodonStream } from "../channel.js";
import { Note } from "@/models/entities/note.js";
import { NoteConverter } from "@/server/api/mastodon/converters/note.js";
import { StreamMessages } from "@/server/api/stream/types.js";
import { NoteHelpers } from "@/server/api/mastodon/helpers/note.js";
import { Packed } from "@/misc/schema.js";
export class MastodonStreamDirect extends MastodonStream {
public static shouldShare = true;
public static requireCredential = true;
public static requiredScopes = ['read:statuses'];
constructor(connection: MastodonStream["connection"], name: string) {
super(connection, name);
this.onNote = this.onNote.bind(this);
this.onNoteEvent = this.onNoteEvent.bind(this);
}
override get user() {
return this.connection.user!;
}
public async init() {
this.subscriber.on("notesStream", this.onNote);
this.subscriber.on("noteUpdatesStream", this.onNoteEvent);
}
private async onNote(note: Note) {
if (!this.shouldProcessNote(note)) return;
NoteConverter.encodeEvent(note, this.user).then(encoded => {
this.connection.send(this.chName, "update", encoded);
});
NoteHelpers.getConversationFromEvent(note.id, this.user).then(conversation => {
this.connection.send(this.chName, "conversation", conversation);
});
}
private async onNoteEvent(data: StreamMessages["noteUpdates"]["payload"]) {
const note = data.body;
if (!this.shouldProcessNote(note)) return;
NoteHelpers.getConversationFromEvent(note.id, this.user).then(conversation => {
this.connection.send(this.chName, "conversation", conversation);
});
switch (data.type) {
case "updated":
NoteConverter.encodeEvent(note, this.user).then(encoded => {
this.connection.send(this.chName, "status.update", encoded);
});
break;
case "deleted":
this.connection.send(this.chName, "delete", note.id);
break;
default:
break;
}
}
private shouldProcessNote(note: Note | Packed<"Note">): boolean {
if (note.visibility !== "specified") return false;
if (note.userId !== this.user.id && !note.visibleUserIds?.includes(this.user.id)) return false;
return true;
}
public dispose() {
this.subscriber.off("notesStream", this.onNote);
this.subscriber.off("noteUpdatesStream", this.onNoteEvent);
}
}

View file

@ -0,0 +1,89 @@
import { MastodonStream } from "../channel.js";
import { getWordHardMute } from "@/misc/check-word-mute.js";
import { Note } from "@/models/entities/note.js";
import { NoteConverter } from "@/server/api/mastodon/converters/note.js";
import { StreamMessages } from "@/server/api/stream/types.js";
import { Packed } from "@/misc/schema.js";
import { User } from "@/models/entities/user.js";
import { UserListJoinings } from "@/models/index.js";
export class MastodonStreamList extends MastodonStream {
public static shouldShare = false;
public static requireCredential = true;
public static requiredScopes = ['read:statuses'];
private readonly listId: string;
private listUsers: User["id"][] = [];
private listUsersClock: NodeJS.Timer;
constructor(connection: MastodonStream["connection"], name: string, list: string) {
super(connection, name);
this.listId = list;
this.onNote = this.onNote.bind(this);
this.onNoteEvent = this.onNoteEvent.bind(this);
this.updateListUsers = this.updateListUsers.bind(this);
}
override get user() {
return this.connection.user!;
}
public async init() {
if (!this.listId) return;
this.subscriber.on("notesStream", this.onNote);
this.subscriber.on("noteUpdatesStream", this.onNoteEvent);
this.updateListUsers();
this.listUsersClock = setInterval(this.updateListUsers, 5000);
}
private async updateListUsers() {
const users = await UserListJoinings.find({
where: {
userListId: this.listId,
},
select: ["userId"],
});
this.listUsers = users.map((x) => x.userId);
}
private async onNote(note: Note) {
if (!await this.shouldProcessNote(note)) return;
const encoded = await NoteConverter.encodeEvent(note, this.user)
this.connection.send(this.chName, "update", encoded);
}
private async onNoteEvent(data: StreamMessages["noteUpdates"]["payload"]) {
const note = data.body;
if (!await this.shouldProcessNote(note)) return;
switch (data.type) {
case "updated":
const encoded = await NoteConverter.encodeEvent(note, this.user);
this.connection.send(this.chName, "status.update", encoded);
break;
case "deleted":
this.connection.send(this.chName, "delete", note.id);
break;
default:
break;
}
}
private async shouldProcessNote(note: Note | Packed<"Note">): Promise<boolean> {
if (!this.listUsers.includes(note.userId)) return false;
if (note.channelId) return false;
if (note.renote && !note.text && this.renoteMuting.has(note.userId)) return false;
if (this.userProfile && (await getWordHardMute(note, this.user, this.userProfile.mutedWords))) return false;
if (note.visibility === "specified") return !!note.visibleUserIds?.includes(this.user.id);
if (note.visibility === "followers") return this.following.has(note.userId);
return true;
}
public dispose() {
this.subscriber.off("notesStream", this.onNote);
this.subscriber.off("noteUpdatesStream", this.onNoteEvent);
clearInterval(this.listUsersClock);
}
}

View file

@ -0,0 +1,80 @@
import { MastodonStream } from "../channel.js";
import { getWordHardMute } from "@/misc/check-word-mute.js";
import { isUserRelated } from "@/misc/is-user-related.js";
import { isInstanceMuted } from "@/misc/is-instance-muted.js";
import { Note } from "@/models/entities/note.js";
import { NoteConverter } from "@/server/api/mastodon/converters/note.js";
import { StreamMessages } from "@/server/api/stream/types.js";
import { fetchMeta } from "@/misc/fetch-meta.js";
export class MastodonStreamPublic extends MastodonStream {
public static shouldShare = true;
public static requireCredential = false;
private readonly mediaOnly: boolean;
private readonly localOnly: boolean;
private readonly remoteOnly: boolean;
constructor(connection: MastodonStream["connection"], name: string) {
super(connection, name);
this.mediaOnly = name.endsWith(":media");
this.localOnly = name.startsWith("public:local");
this.remoteOnly = name.startsWith("public:remote");
this.onNote = this.onNote.bind(this);
this.onNoteEvent = this.onNoteEvent.bind(this);
}
public async init() {
const meta = await fetchMeta();
if (meta.disableGlobalTimeline) {
if (this.user == null || !(this.user.isAdmin || this.user.isModerator))
return;
}
this.subscriber.on("notesStream", this.onNote);
this.subscriber.on("noteUpdatesStream", this.onNoteEvent);
}
private async onNote(note: Note) {
if (!await this.shouldProcessNote(note)) return;
const encoded = await NoteConverter.encodeEvent(note, this.user)
this.connection.send(this.chName, "update", encoded);
}
private async onNoteEvent(data: StreamMessages["noteUpdates"]["payload"]) {
const note = data.body;
if (!await this.shouldProcessNote(note)) return;
switch (data.type) {
case "updated":
const encoded = await NoteConverter.encodeEvent(note, this.user);
this.connection.send(this.chName, "status.update", encoded);
break;
case "deleted":
this.connection.send(this.chName, "delete", note.id);
break;
default:
break;
}
}
private async shouldProcessNote(note: Note): Promise<boolean> {
if (note.visibility !== "public") return false;
if (note.channelId != null) return false;
if (this.mediaOnly && note.fileIds.length < 1) return false;
if (this.localOnly && note.userHost !== null) return false;
if (this.remoteOnly && note.userHost === null) return false;
if (isInstanceMuted(note, new Set<string>(this.userProfile?.mutedInstances ?? []))) return false;
if (isUserRelated(note, this.muting)) return false;
if (isUserRelated(note, this.blocking)) return false;
if (note.renote && !note.text && this.renoteMuting.has(note.userId)) return false;
if (this.userProfile && (await getWordHardMute(note, this.user, this.userProfile.mutedWords))) return false;
return true;
}
public dispose() {
this.subscriber.off("notesStream", this.onNote);
this.subscriber.off("noteUpdatesStream", this.onNoteEvent);
}
}

View file

@ -0,0 +1,75 @@
import { MastodonStream } from "../channel.js";
import { getWordHardMute } from "@/misc/check-word-mute.js";
import { isUserRelated } from "@/misc/is-user-related.js";
import { isInstanceMuted } from "@/misc/is-instance-muted.js";
import { Note } from "@/models/entities/note.js";
import { NoteConverter } from "@/server/api/mastodon/converters/note.js";
import { StreamMessages } from "@/server/api/stream/types.js";
export class MastodonStreamTag extends MastodonStream {
public static shouldShare = false;
public static requireCredential = false;
private readonly localOnly: boolean;
private readonly tag: string;
constructor(connection: MastodonStream["connection"], name: string, tag: string) {
super(connection, name);
this.tag = tag;
this.localOnly = name.startsWith("hashtag:local");
this.onNote = this.onNote.bind(this);
this.onNoteEvent = this.onNoteEvent.bind(this);
}
override get user() {
return this.connection.user!;
}
public async init() {
if (!this.tag) return;
this.subscriber.on("notesStream", this.onNote);
this.subscriber.on("noteUpdatesStream", this.onNoteEvent);
}
private async onNote(note: Note) {
if (!await this.shouldProcessNote(note)) return;
const encoded = await NoteConverter.encodeEvent(note, this.user)
this.connection.send(this.chName, "update", encoded);
}
private async onNoteEvent(data: StreamMessages["noteUpdates"]["payload"]) {
const note = data.body;
if (!await this.shouldProcessNote(note)) return;
switch (data.type) {
case "updated":
const encoded = await NoteConverter.encodeEvent(note, this.user);
this.connection.send(this.chName, "status.update", encoded);
break;
case "deleted":
this.connection.send(this.chName, "delete", note.id);
break;
default:
break;
}
}
private async shouldProcessNote(note: Note): Promise<boolean> {
if (note.visibility !== "public") return false;
if (note.channelId != null) return false;
if (this.localOnly && note.userHost !== null) return false;
if (!note.tags?.includes(this.tag)) return false;
if (isInstanceMuted(note, new Set<string>(this.userProfile?.mutedInstances ?? []))) return false;
if (isUserRelated(note, this.muting)) return false;
if (isUserRelated(note, this.blocking)) return false;
if (note.renote && !note.text && this.renoteMuting.has(note.userId)) return false;
if (this.userProfile && (await getWordHardMute(note, this.user, this.userProfile.mutedWords))) return false;
return true;
}
public dispose() {
this.subscriber.off("notesStream", this.onNote);
this.subscriber.off("noteUpdatesStream", this.onNoteEvent);
}
}

View file

@ -0,0 +1,112 @@
import { MastodonStream } from "../channel.js";
import { getWordHardMute } from "@/misc/check-word-mute.js";
import { isUserRelated } from "@/misc/is-user-related.js";
import { isInstanceMuted } from "@/misc/is-instance-muted.js";
import { Note } from "@/models/entities/note.js";
import { NoteConverter } from "@/server/api/mastodon/converters/note.js";
import { StreamMessages } from "@/server/api/stream/types.js";
import { NotificationConverter } from "@/server/api/mastodon/converters/notification.js";
import { AnnouncementConverter } from "@/server/api/mastodon/converters/announcement.js";
export class MastodonStreamUser extends MastodonStream {
public static shouldShare = true;
public static requireCredential = true;
public static requiredScopes = ['read:statuses', 'read:notifications'];
private readonly notificationsOnly: boolean;
constructor(connection: MastodonStream["connection"], name: string) {
super(connection, name);
this.notificationsOnly = name === "user:notification";
this.onNote = this.onNote.bind(this);
this.onNoteEvent = this.onNoteEvent.bind(this);
this.onUserEvent = this.onUserEvent.bind(this);
this.onBroadcastEvent = this.onBroadcastEvent.bind(this);
}
override get user() {
return this.connection.user!;
}
public async init() {
this.subscriber.on(`mainStream:${this.user.id}`, this.onUserEvent);
if (!this.notificationsOnly) {
this.subscriber.on("notesStream", this.onNote);
this.subscriber.on("noteUpdatesStream", this.onNoteEvent);
this.subscriber.on("broadcast", this.onBroadcastEvent);
}
}
private async onNote(note: Note) {
if (!await this.shouldProcessNote(note)) return;
const encoded = await NoteConverter.encodeEvent(note, this.user)
this.connection.send(this.chName, "update", encoded);
}
private async onNoteEvent(data: StreamMessages["noteUpdates"]["payload"]) {
const note = data.body;
if (!await this.shouldProcessNote(note)) return;
switch (data.type) {
case "updated":
const encoded = await NoteConverter.encodeEvent(note, this.user);
this.connection.send(this.chName, "status.update", encoded);
break;
case "deleted":
this.connection.send(this.chName, "delete", note.id);
break;
default:
break;
}
}
private async onUserEvent(data: StreamMessages["main"]["payload"]) {
switch (data.type) {
case "notification":
const encoded = await NotificationConverter.encodeEvent(data.body.id, this.user);
if (encoded) this.connection.send(this.chName, "notification", encoded);
break;
default:
break;
}
}
private async onBroadcastEvent(data: StreamMessages["broadcast"]["payload"]) {
switch (data.type) {
case "announcementAdded":
// This shouldn't be necessary but is for some reason
data.body.createdAt = new Date(data.body.createdAt);
this.connection.send(this.chName, "announcement", AnnouncementConverter.encode(data.body, false));
break;
case "announcementDeleted":
this.connection.send(this.chName, "announcement.delete", data.body);
break;
default:
break;
}
}
private async shouldProcessNote(note: Note): Promise<boolean> {
if (note.visibility === "hidden") return false;
if (note.visibility === "specified") return note.userId === this.user.id || note.visibleUserIds?.includes(this.user.id);
if (note.channelId) return false;
if (this.user!.id !== note.userId && !this.following.has(note.userId)) return false;
if (isInstanceMuted(note, new Set<string>(this.userProfile?.mutedInstances ?? []))) return false;
if (isUserRelated(note, this.muting)) return false;
if (isUserRelated(note, this.blocking)) return false;
if (note.renote && !note.text && this.renoteMuting.has(note.userId)) return false;
if (this.userProfile && (await getWordHardMute(note, this.user, this.userProfile.mutedWords))) return false;
return true;
}
public dispose() {
this.subscriber.off(`mainStream:${this.user.id}`, this.onUserEvent);
if (!this.notificationsOnly) {
this.subscriber.off("notesStream", this.onNote);
this.subscriber.off("noteUpdatesStream", this.onNoteEvent);
this.subscriber.off("broadcast", this.onBroadcastEvent);
}
}
}

View file

@ -0,0 +1,261 @@
import type { EventEmitter } from "events";
import type * as websocket from "websocket";
import type { ILocalUser, User } from "@/models/entities/user.js";
import type { MastodonStream } from "./channel.js";
import { Blockings, Followings, Mutings, RenoteMutings, UserProfiles, } from "@/models/index.js";
import type { AccessToken } from "@/models/entities/access-token.js";
import type { UserProfile } from "@/models/entities/user-profile.js";
import { StreamEventEmitter, StreamMessages } from "@/server/api/stream/types.js";
import { apiLogger } from "@/server/api/logger.js";
import { MastodonStreamUser } from "@/server/api/mastodon/streaming/channels/user.js";
import { MastodonStreamDirect } from "@/server/api/mastodon/streaming/channels/direct.js";
import { MastodonStreamPublic } from "@/server/api/mastodon/streaming/channels/public.js";
import { MastodonStreamList } from "@/server/api/mastodon/streaming/channels/list.js";
import { ParsedUrlQuery } from "querystring";
import { toSingleLast } from "@/prelude/array.js";
import { MastodonStreamTag } from "@/server/api/mastodon/streaming/channels/tag.js";
import { AuthConverter } from "@/server/api/mastodon/converters/auth.js";
const logger = apiLogger.createSubLogger("streaming").createSubLogger("mastodon");
const channels: Record<string, any> = {
"user": MastodonStreamUser,
"user:notification": MastodonStreamUser,
"direct": MastodonStreamDirect,
"list": MastodonStreamList,
"public": MastodonStreamPublic,
"public:media": MastodonStreamPublic,
"public:local": MastodonStreamPublic,
"public:local:media": MastodonStreamPublic,
"public:remote": MastodonStreamPublic,
"public:remote:media": MastodonStreamPublic,
"hashtag": MastodonStreamTag,
"hashtag:local": MastodonStreamTag,
}
export class MastodonStreamingConnection {
public user?: ILocalUser;
public userProfile?: UserProfile | null;
public following: Set<User["id"]> = new Set();
public muting: Set<User["id"]> = new Set();
public renoteMuting: Set<User["id"]> = new Set();
public blocking: Set<User["id"]> = new Set();
public token?: AccessToken;
private wsConnection: websocket.connection;
private channels: MastodonStream[] = [];
public subscriber: StreamEventEmitter;
constructor(
wsConnection: websocket.connection,
subscriber: EventEmitter,
user: ILocalUser | null | undefined,
token: AccessToken | null | undefined,
query: ParsedUrlQuery,
) {
const channel = toSingleLast(query.stream);
logger.debug(`New connection on channel: ${channel}`);
this.wsConnection = wsConnection;
this.subscriber = subscriber;
if (user) this.user = user;
if (token) this.token = token;
this.onMessage = this.onMessage.bind(this);
this.onUserEvent = this.onUserEvent.bind(this);
this.wsConnection.on("message", this.onMessage);
if (this.user) {
this.updateFollowing();
this.updateMuting();
this.updateRenoteMuting();
this.updateBlocking();
this.updateUserProfile();
this.subscriber.on(`user:${this.user.id}`, this.onUserEvent);
}
if (channel) {
const list = toSingleLast(query.list);
const tag = toSingleLast(query.tag);
this.onMessage({
type: "utf8",
utf8Data: JSON.stringify({ stream: channel, type: "subscribe", list, tag }),
});
}
}
private onUserEvent(data: StreamMessages["user"]["payload"]) {
switch (data.type) {
case "follow":
this.following.add(data.body.id);
break;
case "unfollow":
this.following.delete(data.body.id);
break;
case "mute":
this.muting.add(data.body.id);
break;
case "unmute":
this.muting.delete(data.body.id);
break;
// TODO: renote mute events
// TODO: block events
case "updateUserProfile":
this.userProfile = data.body;
break;
case "terminate":
this.closeConnection();
break;
default:
break;
}
}
private async onMessage(data: websocket.Message) {
if (data.type !== "utf8") return;
if (data.utf8Data == null) return;
let message: Record<string, any>;
try {
message = JSON.parse(data.utf8Data);
} catch (e) {
logger.error("Failed to parse json data, ignoring");
return;
}
const { stream, type, list, tag } = message;
if (!message.stream || !message.type) {
logger.error("Invalid message received, ignoring");
return;
}
if (list ?? tag)
logger.info(`${type}: ${stream} ${list ?? tag}`);
else
logger.info(`${type}: ${stream}`);
switch (type) {
case "subscribe":
this.connectChannel(stream, list, tag);
break;
case "unsubscribe":
this.disconnectChannel(stream);
break;
}
}
public send(stream: string, event: string, payload: any) {
const json = JSON.stringify({
stream: [stream],
event: event,
payload: typeof payload === "string" ? payload : JSON.stringify(payload),
})
this.wsConnection.send(json);
}
public connectChannel(channel: string, list?: string, tag?: string) {
if (channels[channel].requireCredential) {
if (this.user == null) {
logger.info(`Refusing connection to channel ${channel} without authentication, terminating connection`);
this.closeConnection();
return;
} else if (!AuthConverter.decode(channels[channel].requiredScopes).every(p => this.token?.permission.includes(p))) {
logger.info(`Refusing connection to channel ${channel} without required OAuth scopes, terminating connection`);
this.closeConnection();
return;
}
}
if (
channels[channel].shouldShare &&
this.channels.some((c) => c.chName === channel)
) {
return;
}
let ch: MastodonStream;
if (channel === "list") {
ch = new channels[channel](this, channel, list);
} else if (channel.startsWith("hashtag"))
ch = new channels[channel](this, channel, tag);
else
ch = new channels[channel](this, channel);
this.channels.push(ch);
ch.init(null);
}
public disconnectChannel(channelName: string) {
const channel = this.channels.find((c) => c.chName === channelName);
if (channel) {
if (channel.dispose) channel.dispose();
this.channels = this.channels.filter((c) => c.chName !== channelName);
}
}
private async updateFollowing() {
const followings = await Followings.find({
where: {
followerId: this.user!.id,
},
select: ["followeeId"],
});
this.following = new Set<string>(followings.map((x) => x.followeeId));
}
private async updateMuting() {
const mutings = await Mutings.find({
where: {
muterId: this.user!.id,
},
select: ["muteeId"],
});
this.muting = new Set<string>(mutings.map((x) => x.muteeId));
}
private async updateRenoteMuting() {
const renoteMutings = await RenoteMutings.find({
where: {
muterId: this.user!.id,
},
select: ["muteeId"],
});
this.renoteMuting = new Set<string>(renoteMutings.map((x) => x.muteeId));
}
private async updateBlocking() {
const blockings = await Blockings.find({
where: {
blockeeId: this.user!.id,
},
select: ["blockerId"],
});
this.blocking = new Set<string>(blockings.map((x) => x.blockerId));
}
private async updateUserProfile() {
this.userProfile = await UserProfiles.findOneBy({
userId: this.user!.id,
});
}
public closeConnection() {
this.wsConnection.close();
this.dispose();
}
public dispose() {
for (const c of this.channels.filter((c) => c.dispose)) {
if (c.dispose) c.dispose();
}
}
}

View file

@ -56,7 +56,6 @@ export default class Connection {
accessToken: string,
prepareStream: string | undefined,
) {
console.log("constructor", prepareStream);
this.wsConnection = wsConnection;
this.subscriber = subscriber;
if (user) this.user = user;
@ -85,7 +84,6 @@ export default class Connection {
this.subscriber.on(`user:${this.user.id}`, this.onUserEvent);
}
console.log("prepare", prepareStream);
if (prepareStream) {
this.onWsConnectionMessage({
type: "utf8",

View file

@ -15,6 +15,7 @@ import type { Signin } from "@/models/entities/signin.js";
import type { Page } from "@/models/entities/page.js";
import type { Packed } from "@/misc/schema.js";
import type { Webhook } from "@/models/entities/webhook";
import { Announcement } from "@/models/entities/announcement.js";
//#region Stream type-body definitions
export interface InternalStreamTypes {
@ -59,6 +60,8 @@ export interface BroadcastTypes {
emojiAdded: {
emoji: Packed<"Emoji">;
};
announcementAdded: Announcement;
announcementDeleted: Announcement["id"];
}
export interface UserStreamTypes {
@ -162,6 +165,11 @@ type NoteStreamEventTypes = {
};
};
export interface NoteUpdatesStreamTypes {
deleted: Note;
updated: Note;
}
export interface ChannelStreamTypes {
typing: User["id"];
}
@ -271,6 +279,10 @@ export type StreamMessages = {
name: "notesStream";
payload: Note;
};
noteUpdates: {
name: "noteUpdatesStream";
payload: EventUnionFromDictionary<NoteUpdatesStreamTypes>;
};
};
// API event definitions

View file

@ -7,6 +7,10 @@ import { subscriber as redisClient } from "@/db/redis.js";
import { Users } from "@/models/index.js";
import MainStreamConnection from "./stream/index.js";
import authenticate from "./authenticate.js";
import { apiLogger } from "@/server/api/logger.js";
import { MastodonStreamingConnection } from "@/server/api/mastodon/streaming/index.js";
export const streamingLogger = apiLogger.createSubLogger("streaming");
export const initializeStreamingServer = (server: http.Server) => {
// Init websocket server
@ -48,17 +52,15 @@ export const initializeStreamingServer = (server: http.Server) => {
redisClient.on("message", onRedisMessage);
const host = `https://${request.host}`;
const prepareStream = q.stream?.toString();
console.log("start", q);
const main = new MainStreamConnection(
connection,
ev,
user,
app,
host,
accessToken,
prepareStream,
);
const isMastodon = request.resourceURL.pathname?.endsWith('/api/v1/streaming');
if (isMastodon && !request.resourceURL.pathname?.startsWith('/mastodon')) {
streamingLogger.warn(`Received connect from mastodon on incorrect path: ${request.resourceURL.pathname}`);
}
const main = isMastodon
? new MastodonStreamingConnection(connection, ev, user, app, q)
: new MainStreamConnection(connection, ev, user, app, host, accessToken, prepareStream);
const intervalId = user
? setInterval(() => {

View file

@ -1,5 +1,5 @@
import { Brackets, In } from "typeorm";
import { publishNoteStream } from "@/services/stream.js";
import { publishNoteStream, publishNoteUpdatesStream } from "@/services/stream.js";
import renderDelete from "@/remote/activitypub/renderer/delete.js";
import renderAnnounce from "@/remote/activitypub/renderer/announce.js";
import renderUndo from "@/remote/activitypub/renderer/undo.js";
@ -53,6 +53,8 @@ export default async function (
deletedAt: deletedAt,
});
publishNoteUpdatesStream("deleted", note);
//#region ローカルの投稿なら削除アクティビティを配送
if (Users.isLocalUser(user) && !note.localOnly) {
let renote: Note | null = null;

View file

@ -1,6 +1,6 @@
import * as mfm from "mfm-js";
import {
publishNoteStream,
publishNoteStream, publishNoteUpdatesStream,
} from "@/services/stream.js";
import DeliverManager from "@/remote/activitypub/deliver-manager.js";
import renderNote from "@/remote/activitypub/renderer/note.js";
@ -172,6 +172,8 @@ export default async function (
updatedAt: update.updatedAt,
});
publishNoteUpdatesStream("updated", note);
(async () => {
const noteActivity = await renderNote(note, false);
noteActivity.updated = note.updatedAt.toISOString();

View file

@ -20,7 +20,7 @@ import type {
MessagingStreamTypes,
NoteStreamTypes,
UserListStreamTypes,
UserStreamTypes,
UserStreamTypes, NoteUpdatesStreamTypes,
} from "@/server/api/stream/types.js";
class Publisher {
@ -104,10 +104,19 @@ class Publisher {
type: K,
value?: NoteStreamTypes[K],
): void => {
this.publish(`noteStream:${noteId}`, type, {
const object = {
id: noteId,
body: value,
});
};
this.publish(`noteStream:${noteId}`, type, object);
};
public publishNoteUpdatesStream = <K extends keyof NoteUpdatesStreamTypes>(
type: K,
value: NoteUpdatesStreamTypes[K],
): void => {
this.publish('noteUpdatesStream', type, value);
};
public publishChannelStream = <K extends keyof ChannelStreamTypes>(
@ -215,6 +224,7 @@ export const publishMainStream = publisher.publishMainStream;
export const publishDriveStream = publisher.publishDriveStream;
export const publishNoteStream = publisher.publishNoteStream;
export const publishNotesStream = publisher.publishNotesStream;
export const publishNoteUpdatesStream = publisher.publishNoteUpdatesStream;
export const publishChannelStream = publisher.publishChannelStream;
export const publishUserListStream = publisher.publishUserListStream;
export const publishAntennaStream = publisher.publishAntennaStream;